From bf4a9a1a8e7e1c7ba887a64a5d04939ca2573095 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Mon, 2 Jan 2017 09:44:50 +0800 Subject: [PATCH] HBASE-17372 Make AsyncTable thread safe --- .../hbase/client/AsyncBatchRpcRetryingCaller.java | 83 +---- .../hadoop/hbase/client/AsyncClientScanner.java | 20 +- .../hbase/client/AsyncConnectionConfiguration.java | 18 +- .../hadoop/hbase/client/AsyncConnectionImpl.java | 1 - .../client/AsyncRpcRetryingCallerFactory.java | 85 ++--- .../AsyncScanSingleRegionRpcRetryingCaller.java | 44 +-- .../AsyncSingleRequestRpcRetryingCaller.java | 57 ++- .../client/AsyncSmallScanRpcRetryingCaller.java | 13 +- .../org/apache/hadoop/hbase/client/AsyncTable.java | 30 +- .../apache/hadoop/hbase/client/AsyncTableBase.java | 398 +++++++++++++++++---- .../apache/hadoop/hbase/client/AsyncTableImpl.java | 103 ++---- .../hbase/client/AsyncTableResultScanner.java | 10 +- .../hadoop/hbase/client/ConnectionUtils.java | 13 +- .../hadoop/hbase/client/OperationConfig.java | 144 ++++++++ .../apache/hadoop/hbase/client/RawAsyncTable.java | 26 +- .../hadoop/hbase/client/RawAsyncTableImpl.java | 156 ++++---- .../java/org/apache/hadoop/hbase/HConstants.java | 3 - .../TestAsyncSingleRequestRpcRetryingCaller.java | 39 +- .../client/TestAsyncTableGetMultiThreaded.java | 16 +- .../hadoop/hbase/client/TestRawAsyncTableScan.java | 6 +- 20 files changed, 780 insertions(+), 485 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationConfig.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 6f0b8e9..052b06f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.CellUtil.createCellScanner; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; @@ -40,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType; -import org.apache.hadoop.hbase.util.AtomicUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -96,17 +93,7 @@ class AsyncBatchRpcRetryingCaller { private final IdentityHashMap> action2Errors; - private final long pauseNs; - - private final int maxAttempts; - - private final long operationTimeoutNs; - - private final long readRpcTimeoutNs; - - private final long writeRpcTimeoutNs; - - private final int startLogErrorsCnt; + private final OperationConfig operationConfig; private final long startNs; @@ -128,40 +115,18 @@ class AsyncBatchRpcRetryingCaller { public final ConcurrentMap actionsByRegion = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); - public final AtomicLong rpcTimeoutNs; - - public ServerRequest(long defaultRpcTimeoutNs) { - this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs); - } - - public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) { + public void addAction(HRegionLocation loc, Action action) { computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); - // try update the timeout to a larger value - if (this.rpcTimeoutNs.get() <= 0) { - return; - } - if (rpcTimeoutNs <= 0) { - this.rpcTimeoutNs.set(-1L); - return; - } - AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs); } } public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, - TableName tableName, List actions, long pauseNs, int maxRetries, - long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs, - int startLogErrorsCnt) { + TableName tableName, List actions, OperationConfig operationConfig) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; - this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); - this.operationTimeoutNs = operationTimeoutNs; - this.readRpcTimeoutNs = readRpcTimeoutNs; - this.writeRpcTimeoutNs = writeRpcTimeoutNs; - this.startLogErrorsCnt = startLogErrorsCnt; + this.operationConfig = operationConfig; this.actions = new ArrayList<>(actions.size()); this.futures = new ArrayList<>(actions.size()); @@ -182,7 +147,7 @@ class AsyncBatchRpcRetryingCaller { } private long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); + return operationConfig.getOperationTimeoutNs() - (System.nanoTime() - startNs); } private List removeErrors(Action action) { @@ -193,7 +158,7 @@ class AsyncBatchRpcRetryingCaller { private void logException(int tries, Supplier> regionsSupplier, Throwable error, ServerName serverName) { - if (tries > startLogErrorsCnt) { + if (tries > operationConfig.getStartLogErrorsCnt()) { String regions = regionsSupplier.get().map(r -> "'" + r.loc.getRegionInfo().getRegionNameAsString() + "'") .collect(Collectors.joining(",", "[", "]")); @@ -292,7 +257,7 @@ class AsyncBatchRpcRetryingCaller { } else if (result instanceof Throwable) { Throwable error = translateException((Throwable) result); logException(tries, () -> Stream.of(regionReq), error, serverName); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + if (error instanceof DoNotRetryIOException || tries >= operationConfig.getMaxAttempts()) { failOne(action, tries, error, EnvironmentEdgeManager.currentTime(), getExtraContextForError(serverName)); } else { @@ -322,7 +287,7 @@ class AsyncBatchRpcRetryingCaller { error = translateException(t); logException(tries, () -> Stream.of(regionReq), error, serverName); conn.getLocator().updateCachedLocation(regionReq.loc, error); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + if (error instanceof DoNotRetryIOException || tries >= operationConfig.getMaxAttempts()) { failAll(regionReq.actions.stream(), tries, error, serverName); return; } @@ -338,7 +303,7 @@ class AsyncBatchRpcRetryingCaller { private void send(Map actionsByServer, int tries) { long remainingNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { remainingNs = remainingTimeNs(); if (remainingNs <= 0) { failAll(actionsByServer.values().stream().flatMap(m -> m.actionsByRegion.values().stream()) @@ -366,7 +331,7 @@ class AsyncBatchRpcRetryingCaller { return; } HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs)); + resetController(controller, Math.min(operationConfig.getRpcTimeoutNs(), remainingNs)); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } @@ -390,7 +355,7 @@ class AsyncBatchRpcRetryingCaller { ServerName serverName) { Throwable error = translateException(t); logException(tries, () -> actionsByRegion.values().stream(), error, serverName); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + if (error instanceof DoNotRetryIOException || tries >= operationConfig.getMaxAttempts()) { failAll(actionsByRegion.values().stream().flatMap(r -> r.actions.stream()), tries, error, serverName); return; @@ -403,26 +368,22 @@ class AsyncBatchRpcRetryingCaller { private void tryResubmit(Stream actions, int tries) { long delayNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { failAll(actions, tries); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(operationConfig.getRetryPauseNs(), tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(operationConfig.getRetryPauseNs(), tries - 1); } retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS); } - private long getRpcTimeoutNs(Action action) { - return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs; - } - private void groupAndSend(Stream actions, int tries) { long locateTimeoutNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { locateTimeoutNs = remainingTimeNs(); if (locateTimeoutNs <= 0) { failAll(actions, tries); @@ -433,15 +394,6 @@ class AsyncBatchRpcRetryingCaller { } ConcurrentMap actionsByServer = new ConcurrentHashMap<>(); ConcurrentLinkedQueue locateFailed = new ConcurrentLinkedQueue<>(); - // use the small one as the default timeout value, and increase the timeout value if we have an - // action in the group needs a larger timeout value. - long defaultRpcTimeoutNs; - if (readRpcTimeoutNs > 0) { - defaultRpcTimeoutNs = - writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs; - } else { - defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L; - } CompletableFuture.allOf(actions .map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(), RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> { @@ -454,9 +406,8 @@ class AsyncBatchRpcRetryingCaller { addError(action, error, null); locateFailed.add(action); } else { - computeIfAbsent(actionsByServer, loc.getServerName(), - () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action, - getRpcTimeoutNs(action)); + computeIfAbsent(actionsByServer, loc.getServerName(), () -> new ServerRequest()) + .addAction(loc, action); } })) .toArray(CompletableFuture[]::new)).whenComplete((v, r) -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java index d7a3ed1..6c77a47 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java @@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType; import java.io.IOException; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -55,14 +54,12 @@ class AsyncClientScanner { private final AsyncConnectionImpl conn; - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; + private OperationConfig operationConfig; private final ScanResultCache resultCache; public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName, - AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) { + AsyncConnectionImpl conn, OperationConfig operationConfig) { if (scan.getStartRow() == null) { scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow()); } @@ -73,8 +70,7 @@ class AsyncClientScanner { this.consumer = consumer; this.tableName = tableName; this.conn = conn; - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; + this.operationConfig = operationConfig; this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0 ? new AllowPartialScanResultCache() : new CompleteScanResultCache(); } @@ -115,9 +111,8 @@ class AsyncClientScanner { private void startScan(OpenScannerResponse resp) { conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub) - .setScan(scan).consumer(consumer).resultCache(resultCache) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> { + .setScan(scan).consumer(consumer).resultCache(resultCache).operationConfig(operationConfig) + .start().whenComplete((hasMore, error) -> { if (error != null) { consumer.onError(error); return; @@ -132,9 +127,8 @@ class AsyncClientScanner { private void openScanner() { conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call() - .whenComplete((resp, error) -> { + .locateType(getLocateType(scan)).operationConfig(operationConfig) + .action(this::callOpenScanner).call().whenComplete((resp, error) -> { if (error != null) { consumer.onError(error); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java index 6279d46..b867234 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java @@ -56,6 +56,10 @@ class AsyncConnectionConfiguration { // by this value, see scanTimeoutNs. private final long operationTimeoutNs; + // timeout for each rpc request, maybe override by a more specific config such as readRpcTimeout + // or writeRpcTimeout. + private final long rpcTimeoutNs; + // timeout for each read rpc request private final long readRpcTimeoutNs; @@ -85,10 +89,12 @@ class AsyncConnectionConfiguration { conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos( conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)); - this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, - conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); - this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, - conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT))); + this.rpcTimeoutNs = TimeUnit.MILLISECONDS + .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)); + this.readRpcTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs)); + this.writeRpcTimeoutNs = + TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs)); this.pauseNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE)); this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); @@ -111,6 +117,10 @@ class AsyncConnectionConfiguration { return operationTimeoutNs; } + long getRpcTimeoutNs() { + return rpcTimeoutNs; + } + long getReadRpcTimeoutNs() { return readRpcTimeoutNs; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java index d660b02..3a199bf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java @@ -90,7 +90,6 @@ class AsyncConnectionImpl implements AsyncConnection { private final ConcurrentMap rsStubs = new ConcurrentHashMap<>(); - @SuppressWarnings("deprecation") public AsyncConnectionImpl(Configuration conf, User user) { this.conf = conf; this.user = user; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index 55c56ab..0e39229 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -24,7 +24,6 @@ import io.netty.util.HashedWheelTimer; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; @@ -54,9 +53,7 @@ class AsyncRpcRetryingCallerFactory { private AsyncSingleRequestRpcRetryingCaller.Callable callable; - private long operationTimeoutNs = -1L; - - private long rpcTimeoutNs = -1L; + private OperationConfig operationConfig; private RegionLocateType locateType = RegionLocateType.CURRENT; @@ -70,19 +67,14 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SingleRequestCallerBuilder action( - AsyncSingleRequestRpcRetryingCaller.Callable callable) { + public SingleRequestCallerBuilder + action(AsyncSingleRequestRpcRetryingCaller.Callable callable) { this.callable = callable; return this; } - public SingleRequestCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { - this.operationTimeoutNs = unit.toNanos(operationTimeout); - return this; - } - - public SingleRequestCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + public SingleRequestCallerBuilder operationConfig(OperationConfig operationConfig) { + this.operationConfig = operationConfig; return this; } @@ -95,8 +87,7 @@ class AsyncRpcRetryingCallerFactory { return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, - rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + checkNotNull(operationConfig, "operationConfig is null")); } /** @@ -122,9 +113,7 @@ class AsyncRpcRetryingCallerFactory { private int limit; - private long scanTimeoutNs = -1L; - - private long rpcTimeoutNs = -1L; + private OperationConfig operationConfig; public SmallScanCallerBuilder table(TableName tableName) { this.tableName = tableName; @@ -141,22 +130,16 @@ class AsyncRpcRetryingCallerFactory { return this; } - public SmallScanCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(scanTimeout); - return this; - } - - public SmallScanCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + public SmallScanCallerBuilder operationConfig(OperationConfig operationConfig) { + this.operationConfig = operationConfig; return this; } public AsyncSmallScanRpcRetryingCaller build() { - TableName tableName = checkNotNull(this.tableName, "tableName is null"); - Scan scan = checkNotNull(this.scan, "scan is null"); checkArgument(limit > 0, "invalid limit %d", limit); - return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs, - rpcTimeoutNs); + return new AsyncSmallScanRpcRetryingCaller(conn, checkNotNull(tableName, "tableName is null"), + checkNotNull(scan, "scan is null"), limit, + checkNotNull(operationConfig, "operationConfig is null")); } /** @@ -188,9 +171,7 @@ class AsyncRpcRetryingCallerFactory { private HRegionLocation loc; - private long scanTimeoutNs; - - private long rpcTimeoutNs; + private OperationConfig operationConfig; public ScanSingleRegionCallerBuilder id(long scannerId) { this.scannerId = scannerId; @@ -222,13 +203,8 @@ class AsyncRpcRetryingCallerFactory { return this; } - public ScanSingleRegionCallerBuilder scanTimeout(long scanTimeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(scanTimeout); - return this; - } - - public ScanSingleRegionCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) { - this.rpcTimeoutNs = unit.toNanos(rpcTimeout); + public ScanSingleRegionCallerBuilder operationConfig(OperationConfig operationConfig) { + this.operationConfig = operationConfig; return this; } @@ -238,9 +214,8 @@ class AsyncRpcRetryingCallerFactory { checkNotNull(scan, "scan is null"), scannerId, checkNotNull(resultCache, "resultCache is null"), checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(), - conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs, - conn.connConf.getStartLogErrorsCnt()); + checkNotNull(loc, "location is null"), + checkNotNull(operationConfig, "operationConfig is null")); } /** @@ -264,11 +239,7 @@ class AsyncRpcRetryingCallerFactory { private List actions; - private long operationTimeoutNs = -1L; - - private long readRpcTimeoutNs = -1L; - - private long writeRpcTimeoutNs = -1L; + private OperationConfig operationConfig; public BatchCallerBuilder table(TableName tableName) { this.tableName = tableName; @@ -280,25 +251,15 @@ class AsyncRpcRetryingCallerFactory { return this; } - public BatchCallerBuilder operationTimeout(long operationTimeout, TimeUnit unit) { - this.operationTimeoutNs = unit.toNanos(operationTimeout); - return this; - } - - public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) { - this.readRpcTimeoutNs = unit.toNanos(rpcTimeout); - return this; - } - - public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) { - this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout); + public BatchCallerBuilder operationConfig(OperationConfig operationConfig) { + this.operationConfig = operationConfig; return this; } public AsyncBatchRpcRetryingCaller build() { - return new AsyncBatchRpcRetryingCaller(retryTimer, conn, tableName, actions, - conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs, - readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt()); + return new AsyncBatchRpcRetryingCaller(retryTimer, conn, + checkNotNull(tableName, "tableName is null"), checkNotNull(actions, "actions is null"), + checkNotNull(operationConfig, "operationConfig is null")); } public List> call() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index dae88a7..98b5700 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForScan; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; @@ -77,15 +76,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final HRegionLocation loc; - private final long pauseNs; - - private final int maxAttempts; - - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; - - private final int startLogErrorsCnt; + private final OperationConfig operationConfig; private final Runnable completeWhenNoMoreResultsInRegion; @@ -107,8 +98,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache, - RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs, - int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, + OperationConfig operationConfig) { this.retryTimer = retryTimer; this.scan = scan; this.scannerId = scannerId; @@ -116,11 +107,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { this.consumer = consumer; this.stub = stub; this.loc = loc; - this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - this.startLogErrorsCnt = startLogErrorsCnt; + this.operationConfig = operationConfig; if (scan.isReversed()) { completeWhenNoMoreResultsInRegion = this::completeReversedWhenNoMoreResultsInRegion; } else { @@ -136,7 +123,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void closeScanner() { - resetController(controller, rpcTimeoutNs); + resetController(controller, operationConfig.getRpcTimeoutNs()); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { @@ -178,12 +165,12 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void onError(Throwable error) { error = translateException(error); - if (tries > startLogErrorsCnt) { + if (tries > operationConfig.getStartLogErrorsCnt()) { LOG.warn("Call to " + loc.getServerName() + " for scanner id = " + scannerId + " for " + loc.getRegionInfo().getEncodedName() + " of " + loc.getRegionInfo().getTable() - + " failed, , tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(scanTimeoutNs) + " ms, time elapsed = " + elapsedMs() - + " ms", + + " failed, , tries = " + tries + ", maxAttempts = " + operationConfig.getMaxAttempts() + + ", timeout = " + operationConfig.getOperationTimeout(TimeUnit.MILLISECONDS) + + " ms, time elapsed = " + elapsedMs() + " ms", error); } boolean scannerClosed = @@ -193,20 +180,21 @@ class AsyncScanSingleRegionRpcRetryingCaller { new RetriesExhaustedException.ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), ""); exceptions.add(qt); - if (tries >= maxAttempts) { + if (tries >= operationConfig.getMaxAttempts()) { completeExceptionally(!scannerClosed); return; } long delayNs; - if (scanTimeoutNs > 0) { - long maxDelayNs = scanTimeoutNs - (System.nanoTime() - nextCallStartNs); + if (operationConfig.getOperationTimeoutNs() > 0) { + long maxDelayNs = + operationConfig.getOperationTimeoutNs() - (System.nanoTime() - nextCallStartNs); if (maxDelayNs <= 0) { completeExceptionally(!scannerClosed); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(operationConfig.getRetryPauseNs(), tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(operationConfig.getRetryPauseNs(), tries - 1); } if (scannerClosed) { completeWhenError(false); @@ -297,7 +285,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { } private void call() { - resetController(controller, rpcTimeoutNs); + resetController(controller, operationConfig.getRpcTimeoutNs()); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, false, false); stub.scan(controller, req, this::onComplete); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 04e69af..b9515b0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; -import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import io.netty.util.HashedWheelTimer; @@ -70,15 +69,7 @@ class AsyncSingleRequestRpcRetryingCaller { private final Callable callable; - private final long pauseNs; - - private final int maxAttempts; - - private final long operationTimeoutNs; - - private final long rpcTimeoutNs; - - private final int startLogErrorsCnt; + private final OperationConfig operationConfig; private final CompletableFuture future; @@ -90,19 +81,14 @@ class AsyncSingleRequestRpcRetryingCaller { public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, RegionLocateType locateType, Callable callable, - long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, - int startLogErrorsCnt) { + OperationConfig operationConfig) { this.retryTimer = retryTimer; this.conn = conn; this.tableName = tableName; this.row = row; this.locateType = locateType; this.callable = callable; - this.pauseNs = pauseNs; - this.maxAttempts = retries2Attempts(maxRetries); - this.operationTimeoutNs = operationTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; - this.startLogErrorsCnt = startLogErrorsCnt; + this.operationConfig = operationConfig; this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); this.exceptions = new ArrayList<>(); @@ -116,7 +102,7 @@ class AsyncSingleRequestRpcRetryingCaller { } private long remainingTimeNs() { - return operationTimeoutNs - (System.nanoTime() - startNs); + return operationConfig.getOperationTimeoutNs() - (System.nanoTime() - startNs); } private void completeExceptionally() { @@ -126,27 +112,27 @@ class AsyncSingleRequestRpcRetryingCaller { private void onError(Throwable error, Supplier errMsg, Consumer updateCachedLocation) { error = translateException(error); - if (tries > startLogErrorsCnt) { + if (tries > operationConfig.getStartLogErrorsCnt()) { LOG.warn(errMsg.get(), error); } RetriesExhaustedException.ThrowableWithExtraContext qt = new RetriesExhaustedException.ThrowableWithExtraContext(error, EnvironmentEdgeManager.currentTime(), ""); exceptions.add(qt); - if (error instanceof DoNotRetryIOException || tries >= maxAttempts) { + if (error instanceof DoNotRetryIOException || tries >= operationConfig.getMaxAttempts()) { completeExceptionally(); return; } long delayNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS; if (maxDelayNs <= 0) { completeExceptionally(); return; } - delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1)); + delayNs = Math.min(maxDelayNs, getPauseTime(operationConfig.getRetryPauseNs(), tries - 1)); } else { - delayNs = getPauseTime(pauseNs, tries - 1); + delayNs = getPauseTime(operationConfig.getRetryPauseNs(), tries - 1); } updateCachedLocation.accept(error); tries++; @@ -155,15 +141,15 @@ class AsyncSingleRequestRpcRetryingCaller { private void call(HRegionLocation loc) { long callTimeoutNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { callTimeoutNs = remainingTimeNs(); if (callTimeoutNs <= 0) { completeExceptionally(); return; } - callTimeoutNs = Math.min(callTimeoutNs, rpcTimeoutNs); + callTimeoutNs = Math.min(callTimeoutNs, operationConfig.getRpcTimeoutNs()); } else { - callTimeoutNs = rpcTimeoutNs; + callTimeoutNs = operationConfig.getRpcTimeoutNs(); } ClientService.Interface stub; try { @@ -172,9 +158,9 @@ class AsyncSingleRequestRpcRetryingCaller { onError(e, () -> "Get async stub to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName - + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " - + elapsedMs() + " ms", + + " failed, tries = " + tries + ", maxAttempts = " + operationConfig.getMaxAttempts() + + ", timeout = " + operationConfig.getOperationTimeout(TimeUnit.MILLISECONDS) + + " ms, time elapsed = " + elapsedMs() + " ms", err -> conn.getLocator().updateCachedLocation(loc, err)); return; } @@ -184,8 +170,8 @@ class AsyncSingleRequestRpcRetryingCaller { onError(error, () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " + loc.getRegionInfo().getEncodedName() + " of " + tableName + " failed, tries = " - + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " + + tries + ", maxAttempts = " + operationConfig.getMaxAttempts() + ", timeout = " + + operationConfig.getOperationTimeout(TimeUnit.MILLISECONDS) + " ms, time elapsed = " + elapsedMs() + " ms", err -> conn.getLocator().updateCachedLocation(loc, err)); return; @@ -196,7 +182,7 @@ class AsyncSingleRequestRpcRetryingCaller { private void locateThenCall() { long locateTimeoutNs; - if (operationTimeoutNs > 0) { + if (operationConfig.getOperationTimeoutNs() > 0) { locateTimeoutNs = remainingTimeNs(); if (locateTimeoutNs <= 0) { completeExceptionally(); @@ -210,9 +196,10 @@ class AsyncSingleRequestRpcRetryingCaller { if (error != null) { onError(error, () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName - + " failed, tries = " + tries + ", maxAttempts = " + maxAttempts + ", timeout = " - + TimeUnit.NANOSECONDS.toMillis(operationTimeoutNs) + " ms, time elapsed = " - + elapsedMs() + " ms", + + " failed, tries = " + tries + ", maxAttempts = " + + operationConfig.getMaxAttempts() + ", timeout = " + + operationConfig.getOperationTimeout(TimeUnit.MILLISECONDS) + + " ms, time elapsed = " + elapsedMs() + " ms", err -> { }); return; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java index 6ffa30a..b9e28cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import java.util.function.Function; import org.apache.hadoop.hbase.HRegionInfo; @@ -53,9 +52,7 @@ class AsyncSmallScanRpcRetryingCaller { private final int limit; - private final long scanTimeoutNs; - - private final long rpcTimeoutNs; + private final OperationConfig operationConfig; private final Function nextScan; @@ -64,13 +61,12 @@ class AsyncSmallScanRpcRetryingCaller { private final CompletableFuture> future; public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan, - int limit, long scanTimeoutNs, long rpcTimeoutNs) { + int limit, OperationConfig operationConfig) { this.conn = conn; this.tableName = tableName; this.scan = scan; this.limit = limit; - this.scanTimeoutNs = scanTimeoutNs; - this.rpcTimeoutNs = rpcTimeoutNs; + this.operationConfig = operationConfig; if (scan.isReversed()) { this.nextScan = this::reversedNextScan; } else { @@ -145,8 +141,7 @@ class AsyncSmallScanRpcRetryingCaller { private void scan() { conn.callerFactory. single().table(tableName).row(scan.getStartRow()) - .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call() + .operationConfig(operationConfig).locateType(getLocateType(scan)).action(this::scan).call() .whenComplete((resp, error) -> { if (error != null) { future.completeExceptionally(error); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java index 893beb9..a5de0d7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * The asynchronous table for normal users. *

@@ -55,7 +57,30 @@ public interface AsyncTable extends AsyncTableBase { * @param scan A configured {@link Scan} object. * @return A scanner. */ - ResultScanner getScanner(Scan scan); + default ResultScanner getScanner(Scan scan) { + return getScanner(scan, null); + } + + /** + * Returns a scanner on the current table as specified by the {@link Scan} object. + * @param scan A configured {@link Scan} object. + * @param operationConfig the operation configuration for this call, can be null. + * @return A scanner. + */ + ResultScanner getScanner(Scan scan, @Nullable OperationConfig operationConfig); + + /** + * The scan API uses the observer pattern. All results that match the given scan object will be + * passed to the given {@code consumer} by calling {@link ScanResultConsumer#onNext(Result)}. + * {@link ScanResultConsumer#onComplete()} means the scan is finished, and + * {@link ScanResultConsumer#onError(Throwable)} means we hit an unrecoverable error and the scan + * is terminated. + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + */ + default void scan(Scan scan, ScanResultConsumer consumer) { + scan(scan, consumer, null); + } /** * The scan API uses the observer pattern. All results that match the given scan object will be @@ -65,6 +90,7 @@ public interface AsyncTable extends AsyncTableBase { * is terminated. * @param scan A configured {@link Scan} object. * @param consumer the consumer used to receive results. + * @param operationConfig the operation configuration for this call, can be null. */ - void scan(Scan scan, ScanResultConsumer consumer); + void scan(Scan scan, ScanResultConsumer consumer, @Nullable OperationConfig operationConfig); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java index 19a22c0..b222c31 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java @@ -26,21 +26,22 @@ import com.google.common.base.Preconditions; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.client.OperationConfig.OperationConfigBuilder; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.Bytes; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * The base interface for asynchronous version of Table. Obtain an instance from a * {@link AsyncConnection}. *

- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads - * concurrently. + * The implementation is thread safe. *

* Usually the implementations will not throw any exception directly, you need to get the exception * from the returned {@link CompletableFuture}. @@ -62,62 +63,26 @@ public interface AsyncTableBase { Configuration getConfiguration(); /** - * Set timeout of each rpc read request in operations of this Table instance, will override the - * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. - */ - void setReadRpcTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each rpc read request in this Table instance. + * Create a new retry config. The fields of the return builder have already been set with the + * default value, so you just need to set the field you care about(Usually rpc timeout and + * operation timeout). + * @return A builder to create operation config. */ - long getReadRpcTimeout(TimeUnit unit); + OperationConfigBuilder newOperationConfig(); /** - * Set timeout of each rpc write request in operations of this Table instance, will override the - * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too - * long, it will stop waiting and send a new request to retry until retries exhausted or operation - * timeout reached. - */ - void setWriteRpcTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each rpc write request in this Table instance. - */ - long getWriteRpcTimeout(TimeUnit unit); - - /** - * Set timeout of each operation in this Table instance, will override the value of - * {@code hbase.client.operation.timeout} in configuration. + * Test for the existence of columns in the table, as specified by the Get. *

- * Operation timeout is a top-level restriction that makes sure an operation will not be blocked - * more than this. In each operation, if rpc request fails because of timeout or other reason, it - * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed - * reach the operation timeout before retries exhausted, it will break early and throw - * SocketTimeoutException. - */ - void setOperationTimeout(long timeout, TimeUnit unit); - - /** - * Get timeout of each operation in Table instance. - */ - long getOperationTimeout(TimeUnit unit); - - /** - * Set timeout of a single operation in a scan, such as openScanner and next. Will override the - * value {@code hbase.client.scanner.timeout.period} in configuration. + * This will return true if the Get matches one or more keys, false if not. *

- * Generally a scan will never timeout after we add heartbeat support unless the region is - * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single - * operation in a scan. - */ - void setScanTimeout(long timeout, TimeUnit unit); - - /** - * Get the timeout of a single operation in a scan. + * This is a server-side call so it prevents any data from being transfered to the client. + * @param get The object that specifies what data to test and from which row. + * @return true if the specified Get matches one or more keys, false if not. The return value will + * be wrapped by a {@link CompletableFuture}. */ - long getScanTimeout(TimeUnit unit); + default CompletableFuture exists(Get get) { + return exists(get, null); + } /** * Test for the existence of columns in the table, as specified by the Get. @@ -125,36 +90,71 @@ public interface AsyncTableBase { * This will return true if the Get matches one or more keys, false if not. *

* This is a server-side call so it prevents any data from being transfered to the client. + * @param get The object that specifies what data to test and from which row. + * @param operationConfig the operation configuration for this call, can be null. * @return true if the specified Get matches one or more keys, false if not. The return value will * be wrapped by a {@link CompletableFuture}. */ - default CompletableFuture exists(Get get) { - return get(toCheckExistenceOnly(get)).thenApply(r -> r.getExists()); + default CompletableFuture exists(Get get, @Nullable OperationConfig operationConfig) { + return get(toCheckExistenceOnly(get), operationConfig).thenApply(r -> r.getExists()); + } + + /** + * Extracts certain cells from a given row. + * @param get The object that specifies what data to fetch and from which row. + * @return The data coming from the specified row, if it exists. If the row specified doesn't + * exist, the {@link Result} instance returned won't contain any + * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The + * return value will be wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture get(Get get) { + return get(get, null); } /** * Extracts certain cells from a given row. * @param get The object that specifies what data to fetch and from which row. + * @param operationConfig the operation configuration for this call, can be null. * @return The data coming from the specified row, if it exists. If the row specified doesn't * exist, the {@link Result} instance returned won't contain any * {@link org.apache.hadoop.hbase.KeyValue}, as indicated by {@link Result#isEmpty()}. The * return value will be wrapped by a {@link CompletableFuture}. */ - CompletableFuture get(Get get); + CompletableFuture get(Get get, @Nullable OperationConfig operationConfig); + + /** + * Puts some data to the table. + * @param put The data to put. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture put(Put put) { + return put(put, null); + } /** * Puts some data to the table. * @param put The data to put. + * @param operationConfig the operation configuration for this call, can be null. * @return A {@link CompletableFuture} that always returns null when complete normally. */ - CompletableFuture put(Put put); + CompletableFuture put(Put put, @Nullable OperationConfig operationConfig); /** * Deletes the specified cells/row. * @param delete The object that specifies what to delete. * @return A {@link CompletableFuture} that always returns null when complete normally. */ - CompletableFuture delete(Delete delete); + default CompletableFuture delete(Delete delete) { + return delete(delete, null); + } + + /** + * Deletes the specified cells/row. + * @param delete The object that specifies what to delete. + * @param operationConfig the operation configuration for this call, can be null. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + CompletableFuture delete(Delete delete, @Nullable OperationConfig operationConfig); /** * Appends values to one or more columns within a single row. @@ -167,7 +167,23 @@ public interface AsyncTableBase { * @return values of columns after the append operation (maybe null). The return value will be * wrapped by a {@link CompletableFuture}. */ - CompletableFuture append(Append append); + default CompletableFuture append(Append append) { + return append(append, null); + } + + /** + * Appends values to one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Appends are done under a single row lock, so + * write operations to a row are synchronized, but readers do not take row locks so get and scan + * operations can see this operation partially completed. + * @param append object that specifies the columns and amounts to be used for the increment + * operations. + * @param operationConfig the operation configuration for this call, can be null. + * @return values of columns after the append operation (maybe null). The return value will be + * wrapped by a {@link CompletableFuture}. + */ + CompletableFuture append(Append append, @Nullable OperationConfig operationConfig); /** * Increments one or more columns within a single row. @@ -180,7 +196,24 @@ public interface AsyncTableBase { * @return values of columns after the increment. The return value will be wrapped by a * {@link CompletableFuture}. */ - CompletableFuture increment(Increment increment); + default CompletableFuture increment(Increment increment) { + return increment(increment, null); + } + + /** + * Increments one or more columns within a single row. + *

+ * This operation does not appear atomic to readers. Increments are done under a single row lock, + * so write operations to a row are synchronized, but readers do not take row locks so get and + * scan operations can see this operation partially completed. + * @param increment object that specifies the columns and amounts to be used for the increment + * operations. + * @param operationConfig the operation configuration for this call, can be null. + * @return values of columns after the increment. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture increment(Increment increment, + @Nullable OperationConfig operationConfig); /** * See {@link #incrementColumnValue(byte[], byte[], byte[], long, Durability)} @@ -253,8 +286,27 @@ public interface AsyncTableBase { * @return true if the new put was executed, false otherwise. The return value will be wrapped by * a {@link CompletableFuture}. */ + default CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Put put) { + return checkAndPut(row, family, qualifier, compareOp, value, put, null); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the put. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param put data to put if check succeeds + * @param operationConfig the operation configuration for this call, can be null. + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put); + CompareOp compareOp, byte[] value, Put put, @Nullable OperationConfig operationConfig); /** * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it @@ -286,16 +338,47 @@ public interface AsyncTableBase { * @return true if the new delete was executed, false otherwise. The return value will be wrapped * by a {@link CompletableFuture}. */ + default CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, Delete delete) { + return checkAndDelete(row, family, qualifier, compareOp, value, delete, null); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * adds the delete. If the passed value is null, the check is for the lack of column (ie: + * non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp comparison operator to use + * @param value the expected value + * @param delete data to delete if check succeeds + * @param operationConfig the operation configuration for this call, can be null. + * @return true if the new delete was executed, false otherwise. The return value will be wrapped + * by a {@link CompletableFuture}. + */ CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete); + CompareOp compareOp, byte[] value, Delete delete, @Nullable OperationConfig operationConfig); + + /** + * Performs multiple mutations atomically on a single row. Currently {@link Put} and + * {@link Delete} are supported. + * @param mutation object that specifies the set of mutations to perform atomically + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture mutateRow(RowMutations mutation) { + return mutateRow(mutation, null); + } /** * Performs multiple mutations atomically on a single row. Currently {@link Put} and * {@link Delete} are supported. * @param mutation object that specifies the set of mutations to perform atomically + * @param operationConfig the operation configuration for this call, can be null. * @return A {@link CompletableFuture} that always returns null when complete normally. */ - CompletableFuture mutateRow(RowMutations mutation); + CompletableFuture mutateRow(RowMutations mutation, + @Nullable OperationConfig operationConfig); /** * Atomically checks if a row/family/qualifier value equals to the expected value. If it does, it @@ -327,8 +410,28 @@ public interface AsyncTableBase { * @return true if the new put was executed, false otherwise. The return value will be wrapped by * a {@link CompletableFuture}. */ + default CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, + CompareOp compareOp, byte[] value, RowMutations mutation) { + return checkAndMutate(row, family, qualifier, compareOp, value, mutation, null); + } + + /** + * Atomically checks if a row/family/qualifier value matches the expected value. If it does, it + * performs the row mutations. If the passed value is null, the check is for the lack of column + * (ie: non-existence) + * @param row to check + * @param family column family to check + * @param qualifier column qualifier to check + * @param compareOp the comparison operator + * @param value the expected value + * @param mutation mutations to perform if check succeeds + * @param operationConfig the operation configuration for this call, can be null. + * @return true if the new put was executed, false otherwise. The return value will be wrapped by + * a {@link CompletableFuture}. + */ CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation); + CompareOp compareOp, byte[] value, RowMutations mutation, + @Nullable OperationConfig operationConfig); /** * Just call {@link #smallScan(Scan, int)} with {@link Integer#MAX_VALUE}. @@ -350,7 +453,25 @@ public interface AsyncTableBase { * @return The results of this small scan operation. The return value will be wrapped by a * {@link CompletableFuture}. */ - CompletableFuture> smallScan(Scan scan, int limit); + default CompletableFuture> smallScan(Scan scan, int limit) { + return smallScan(scan, limit, null); + } + + /** + * Return all the results that match the given scan object. The number of the returned results + * will not be greater than {@code limit}. + *

+ * Notice that the scan must be small, and should not use batch or allowPartialResults. The + * {@code caching} property of the scan object is also ignored as we will use {@code limit} + * instead. + * @param scan A configured {@link Scan} object. + * @param limit the limit of results count + * @param operationConfig the operation configuration for this call, can be null. + * @return The results of this small scan operation. The return value will be wrapped by a + * {@link CompletableFuture}. + */ + CompletableFuture> smallScan(Scan scan, int limit, + @Nullable OperationConfig operationConfig); /** * Extracts certain cells from the given rows, in batch. @@ -362,7 +483,22 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the result for each get. */ default List> get(List gets) { - return batch(gets); + return get(gets, null); + } + + /** + * Extracts certain cells from the given rows, in batch. + *

+ * Notice that you may not get all the results with this function, which means some of the + * returned {@link CompletableFuture}s may succeed while some of the other returned + * {@link CompletableFuture}s may fail. + * @param gets The objects that specify what data to fetch and from which rows. + * @param operationConfig the operation configuration for this call, can be null. + * @return A list of {@link CompletableFuture}s that represent the result for each get. + */ + default List> get(List gets, + @Nullable OperationConfig operationConfig) { + return batch(gets, operationConfig); } /** @@ -372,7 +508,19 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that wrapper the result list. */ default CompletableFuture> getAll(List gets) { - return batchAll(gets); + return getAll(gets, null); + } + + /** + * A simple version for batch get. It will fail if there are any failures and you will get the + * whole result list at once if the operation is succeeded. + * @param gets The objects that specify what data to fetch and from which rows. + * @param operationConfig the operation configuration for this call, can be null. + * @return A {@link CompletableFuture} that wrapper the result list. + */ + default CompletableFuture> getAll(List gets, + @Nullable OperationConfig operationConfig) { + return batchAll(gets, operationConfig); } /** @@ -386,8 +534,24 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the existence for each get. */ default List> exists(List gets) { - return get(toCheckExistenceOnly(gets)).stream().map(f -> f.thenApply(r -> r.getExists())) - .collect(toList()); + return exists(gets, null); + } + + /** + * Test for the existence of columns in the table, as specified by the Gets. + *

+ * This will return a list of booleans. Each value will be true if the related Get matches one or + * more keys, false if not. + *

+ * This is a server-side call so it prevents any data from being transferred to the client. + * @param gets the Gets + * @param operationConfig the operation configuration for this call, can be null. + * @return A list of {@link CompletableFuture}s that represent the existence for each get. + */ + default List> exists(List gets, + @Nullable OperationConfig operationConfig) { + return get(toCheckExistenceOnly(gets), operationConfig).stream() + .map(f -> f.thenApply(r -> r.getExists())).collect(toList()); } /** @@ -397,7 +561,19 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that wrapper the result boolean list. */ default CompletableFuture> existsAll(List gets) { - return getAll(toCheckExistenceOnly(gets)) + return existsAll(gets, null); + } + + /** + * A simple version for batch exists. It will fail if there are any failures and you will get the + * whole result boolean list at once if the operation is succeeded. + * @param gets the Gets + * @param operationConfig the operation configuration for this call, can be null. + * @return A {@link CompletableFuture} that wrapper the result boolean list. + */ + default CompletableFuture> existsAll(List gets, + OperationConfig operationConfig) { + return getAll(toCheckExistenceOnly(gets), operationConfig) .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList())); } @@ -407,7 +583,18 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the result for each put. */ default List> put(List puts) { - return voidBatch(this, puts); + return put(puts, null); + } + + /** + * Puts some data in the table, in batch. + * @param puts The list of mutations to apply. + * @param operationConfig the operation configuration for this call, can be null. + * @return A list of {@link CompletableFuture}s that represent the result for each put. + */ + default List> put(List puts, + @Nullable OperationConfig operationConfig) { + return voidBatch(this, puts, operationConfig); } /** @@ -416,7 +603,18 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that always returns null when complete normally. */ default CompletableFuture putAll(List puts) { - return voidBatchAll(this, puts); + return putAll(puts, null); + } + + /** + * A simple version of batch put. It will fail if there are any failures. + * @param puts The list of mutations to apply. + * @param operationConfig the operation configuration for this call, can be null. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture putAll(List puts, + @Nullable OperationConfig operationConfig) { + return voidBatchAll(this, puts, operationConfig); } /** @@ -425,7 +623,18 @@ public interface AsyncTableBase { * @return A list of {@link CompletableFuture}s that represent the result for each delete. */ default List> delete(List deletes) { - return voidBatch(this, deletes); + return delete(deletes, null); + } + + /** + * Deletes the specified cells/rows in bulk. + * @param deletes list of things to delete. + * @param operationConfig the operation configuration for this call, can be null. + * @return A list of {@link CompletableFuture}s that represent the result for each delete. + */ + default List> delete(List deletes, + OperationConfig operationConfig) { + return voidBatch(this, deletes, operationConfig); } /** @@ -434,7 +643,30 @@ public interface AsyncTableBase { * @return A {@link CompletableFuture} that always returns null when complete normally. */ default CompletableFuture deleteAll(List deletes) { - return voidBatchAll(this, deletes); + return deleteAll(deletes, null); + } + + /** + * A simple version of batch delete. It will fail if there are any failures. + * @param deletes list of things to delete. + * @param operationConfig the operation configuration for this call, can be null. + * @return A {@link CompletableFuture} that always returns null when complete normally. + */ + default CompletableFuture deleteAll(List deletes, + @Nullable OperationConfig operationConfig) { + return voidBatchAll(this, deletes, operationConfig); + } + + /** + * Method that does a batch call on Deletes, Gets, Puts, Increments and Appends. The ordering of + * execution of the actions is not defined. Meaning if you do a Put and a Get in the same + * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put + * had put. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @return A list of {@link CompletableFuture}s that represent the result for each action. + */ + default List> batch(List actions) { + return batch(actions, null); } /** @@ -443,9 +675,11 @@ public interface AsyncTableBase { * {@link #batch} call, you will not necessarily be guaranteed that the Get returns what the Put * had put. * @param actions list of Get, Put, Delete, Increment, Append objects + * @param operationConfig the operation configuration for this call, can be null. * @return A list of {@link CompletableFuture}s that represent the result for each action. */ - List> batch(List actions); + List> batch(List actions, + OperationConfig operationConfig); /** * A simple version of batch. It will fail if there are any failures and you will get the whole @@ -454,7 +688,19 @@ public interface AsyncTableBase { * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. */ default CompletableFuture> batchAll(List actions) { - List> futures = batch(actions); + return batchAll(actions, null); + } + + /** + * A simple version of batch. It will fail if there are any failures and you will get the whole + * result list at once if the operation is succeeded. + * @param actions list of Get, Put, Delete, Increment, Append objects + * @param operationConfig the operation configuration for this call, can be null. + * @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}. + */ + default CompletableFuture> batchAll(List actions, + OperationConfig operationConfig) { + List> futures = batch(actions, operationConfig); return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList())); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java index 7281185..473ddaa 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java @@ -21,12 +21,12 @@ import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.OperationConfig.OperationConfigBuilder; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -59,43 +59,8 @@ class AsyncTableImpl implements AsyncTable { } @Override - public void setReadRpcTimeout(long timeout, TimeUnit unit) { - rawTable.setReadRpcTimeout(timeout, unit); - } - - @Override - public long getReadRpcTimeout(TimeUnit unit) { - return rawTable.getReadRpcTimeout(unit); - } - - @Override - public void setWriteRpcTimeout(long timeout, TimeUnit unit) { - rawTable.setWriteRpcTimeout(timeout, unit); - } - - @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return rawTable.getWriteRpcTimeout(unit); - } - - @Override - public void setOperationTimeout(long timeout, TimeUnit unit) { - rawTable.setOperationTimeout(timeout, unit); - } - - @Override - public long getOperationTimeout(TimeUnit unit) { - return rawTable.getOperationTimeout(unit); - } - - @Override - public void setScanTimeout(long timeout, TimeUnit unit) { - rawTable.setScanTimeout(timeout, unit); - } - - @Override - public long getScanTimeout(TimeUnit unit) { - return rawTable.getScanTimeout(unit); + public OperationConfigBuilder newOperationConfig() { + return rawTable.newOperationConfig(); } private CompletableFuture wrap(CompletableFuture future) { @@ -111,56 +76,59 @@ class AsyncTableImpl implements AsyncTable { } @Override - public CompletableFuture get(Get get) { - return wrap(rawTable.get(get)); + public CompletableFuture get(Get get, OperationConfig retryConfig) { + return wrap(rawTable.get(get, retryConfig)); } @Override - public CompletableFuture put(Put put) { - return wrap(rawTable.put(put)); + public CompletableFuture put(Put put, OperationConfig retryConfig) { + return wrap(rawTable.put(put, retryConfig)); } @Override - public CompletableFuture delete(Delete delete) { - return wrap(rawTable.delete(delete)); + public CompletableFuture delete(Delete delete, OperationConfig retryConfig) { + return wrap(rawTable.delete(delete, retryConfig)); } @Override - public CompletableFuture append(Append append) { - return wrap(rawTable.append(append)); + public CompletableFuture append(Append append, OperationConfig retryConfig) { + return wrap(rawTable.append(append, retryConfig)); } @Override - public CompletableFuture increment(Increment increment) { - return wrap(rawTable.increment(increment)); + public CompletableFuture increment(Increment increment, OperationConfig retryConfig) { + return wrap(rawTable.increment(increment, retryConfig)); } @Override public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put) { - return wrap(rawTable.checkAndPut(row, family, qualifier, compareOp, value, put)); + CompareOp compareOp, byte[] value, Put put, OperationConfig retryConfig) { + return wrap(rawTable.checkAndPut(row, family, qualifier, compareOp, value, put, retryConfig)); } @Override public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete) { - return wrap(rawTable.checkAndDelete(row, family, qualifier, compareOp, value, delete)); + CompareOp compareOp, byte[] value, Delete delete, OperationConfig retryConfig) { + return wrap( + rawTable.checkAndDelete(row, family, qualifier, compareOp, value, delete, retryConfig)); } @Override - public CompletableFuture mutateRow(RowMutations mutation) { - return wrap(rawTable.mutateRow(mutation)); + public CompletableFuture mutateRow(RowMutations mutation, OperationConfig retryConfig) { + return wrap(rawTable.mutateRow(mutation, retryConfig)); } @Override public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation) { - return wrap(rawTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation)); + CompareOp compareOp, byte[] value, RowMutations mutation, OperationConfig retryConfig) { + return wrap( + rawTable.checkAndMutate(row, family, qualifier, compareOp, value, mutation, retryConfig)); } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { - return wrap(rawTable.smallScan(scan, limit)); + public CompletableFuture> smallScan(Scan scan, int limit, + OperationConfig retryConfig) { + return wrap(rawTable.smallScan(scan, limit, retryConfig)); } private long resultSize2CacheSize(long maxResultSize) { @@ -169,14 +137,15 @@ class AsyncTableImpl implements AsyncTable { } @Override - public ResultScanner getScanner(Scan scan) { + public ResultScanner getScanner(Scan scan, OperationConfig retryConfig) { return new AsyncTableResultScanner(rawTable, ReflectionUtils.newInstance(scan.getClass(), scan), resultSize2CacheSize( - scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize)); + scan.getMaxResultSize() > 0 ? scan.getMaxResultSize() : defaultScannerMaxResultSize), + retryConfig); } - private void scan0(Scan scan, ScanResultConsumer consumer) { - try (ResultScanner scanner = getScanner(scan)) { + private void scan0(Scan scan, ScanResultConsumer consumer, OperationConfig retryConfig) { + try (ResultScanner scanner = getScanner(scan, retryConfig)) { for (Result result; (result = scanner.next()) != null;) { if (!consumer.onNext(result)) { break; @@ -189,12 +158,14 @@ class AsyncTableImpl implements AsyncTable { } @Override - public void scan(Scan scan, ScanResultConsumer consumer) { - pool.execute(() -> scan0(scan, consumer)); + public void scan(Scan scan, ScanResultConsumer consumer, OperationConfig retryConfig) { + pool.execute(() -> scan0(scan, consumer, retryConfig)); } @Override - public List> batch(List actions) { - return rawTable. batch(actions).stream().map(this::wrap).collect(Collectors.toList()); + public List> batch(List actions, + OperationConfig retryConfig) { + return rawTable. batch(actions, retryConfig).stream().map(this::wrap) + .collect(Collectors.toList()); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java index e2c4ec3..5b1f407 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java @@ -49,6 +49,8 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { private final long maxCacheSize; + private final OperationConfig retryConfig; + private final Queue queue = new ArrayDeque<>(); private long cacheSize; @@ -64,11 +66,13 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { // used to filter out cells that already returned when we restart a scan private Cell lastCell; - public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize) { + public AsyncTableResultScanner(RawAsyncTable table, Scan scan, long maxCacheSize, + OperationConfig retryConfig) { this.rawTable = table; this.scan = scan; this.maxCacheSize = maxCacheSize; - table.scan(scan, this); + this.retryConfig = retryConfig; + table.scan(scan, this, retryConfig); } private void addToCache(Result result) { @@ -165,7 +169,7 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer { LOG.debug(String.format("0x%x", System.identityHashCode(this)) + " resume prefetching"); } prefetchStopped = false; - rawTable.scan(scan, this); + rawTable.scan(scan, this, retryConfig); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 6f4a844..5dbd4a4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -342,14 +342,15 @@ public final class ConnectionUtils { return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList()); } - static List> voidBatch(AsyncTableBase table, - List actions) { - return table. batch(actions).stream().map(f -> f. thenApply(r -> null)) - .collect(toList()); + static List> voidBatch(AsyncTableBase table, List actions, + OperationConfig operationConfig) { + return table. batch(actions, operationConfig).stream() + .map(f -> f. thenApply(r -> null)).collect(toList()); } - static CompletableFuture voidBatchAll(AsyncTableBase table, List actions) { - return table. batchAll(actions).thenApply(r -> null); + static CompletableFuture voidBatchAll(AsyncTableBase table, List actions, + OperationConfig operationConfig) { + return table. batchAll(actions, operationConfig).thenApply(r -> null); } static RegionLocateType getLocateType(Scan scan) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationConfig.java new file mode 100644 index 0000000..23eb29d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationConfig.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; + +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Contains configurations for an operation. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class OperationConfig { + + private final long rpcTimeoutNs; + + private final long operationTimeoutNs; + + private final long pauseNs; + + private final int maxAttempts; + + private final int startLogErrorsCnt; + + private OperationConfig(long rpcTimeoutNs, long operationTimeoutNs, long pauseNs, int maxAttempts, + int startLogErrorsCnt) { + this.rpcTimeoutNs = rpcTimeoutNs; + this.operationTimeoutNs = operationTimeoutNs; + this.pauseNs = pauseNs; + this.maxAttempts = maxAttempts; + this.startLogErrorsCnt = startLogErrorsCnt; + } + + public long getRpcTimeout(TimeUnit unit) { + return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS); + } + + public long getOperationTimeout(TimeUnit unit) { + return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); + } + + public long getRetryPause(TimeUnit unit) { + return unit.convert(pauseNs, TimeUnit.NANOSECONDS); + } + + public long getRpcTimeoutNs() { + return rpcTimeoutNs; + } + + public long getOperationTimeoutNs() { + return operationTimeoutNs; + } + + public long getRetryPauseNs() { + return pauseNs; + } + + public int getMaxAttempts() { + return maxAttempts; + } + + public int getStartLogErrorsCnt() { + return startLogErrorsCnt; + } + + @InterfaceAudience.Public + @InterfaceStability.Unstable + public static final class OperationConfigBuilder { + + private long rpcTimeoutNs; + + private long operationTimeoutNs; + + private long pauseNs; + + private int maxAttempts; + + private int startLogErrorsCnt; + + OperationConfigBuilder() { + } + + OperationConfigBuilder(OperationConfig retryConfig) { + this.rpcTimeoutNs = retryConfig.rpcTimeoutNs; + this.operationTimeoutNs = retryConfig.operationTimeoutNs; + this.pauseNs = retryConfig.pauseNs; + this.maxAttempts = retryConfig.maxAttempts; + this.startLogErrorsCnt = retryConfig.startLogErrorsCnt; + } + + public OperationConfigBuilder setRpcTimeout(long timeout, TimeUnit unit) { + this.rpcTimeoutNs = unit.toNanos(timeout); + return this; + } + + public OperationConfigBuilder setOperationTimeout(long timeout, TimeUnit unit) { + this.operationTimeoutNs = unit.toNanos(timeout); + return this; + } + + public OperationConfigBuilder setRetryPause(long timeout, TimeUnit unit) { + this.pauseNs = unit.toNanos(timeout); + return this; + } + + public OperationConfigBuilder setMaxAttempts(int maxAttempts) { + this.maxAttempts = maxAttempts; + return this; + } + + public OperationConfigBuilder setMaxRetries(int maxRetries) { + return setMaxAttempts(retries2Attempts(maxRetries)); + } + + public OperationConfigBuilder setStartLogErrorsCnt(int startLogErrorsCnt) { + this.startLogErrorsCnt = startLogErrorsCnt; + return this; + } + + public OperationConfig build() { + return new OperationConfig(rpcTimeoutNs, operationTimeoutNs, pauseNs, maxAttempts, + startLogErrorsCnt); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java index 0c292a6..c0c1c3f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import edu.umd.cs.findbugs.annotations.Nullable; + /** * A low level asynchronous table. *

@@ -57,5 +59,27 @@ public interface RawAsyncTable extends AsyncTableBase { * @param scan A configured {@link Scan} object. * @param consumer the consumer used to receive results. */ - void scan(Scan scan, RawScanResultConsumer consumer); + default void scan(Scan scan, RawScanResultConsumer consumer) { + scan(scan, consumer, null); + } + + /** + * The basic scan API uses the observer pattern. All results that match the given scan object will + * be passed to the given {@code consumer} by calling + * {@link RawScanResultConsumer#onNext(Result[])}. {@link RawScanResultConsumer#onComplete()} + * means the scan is finished, and {@link RawScanResultConsumer#onError(Throwable)} means we hit + * an unrecoverable error and the scan is terminated. {@link RawScanResultConsumer#onHeartbeat()} + * means the RS is still working but we can not get a valid result to call + * {@link RawScanResultConsumer#onNext(Result[])}. This is usually because the matched results are + * too sparse, for example, a filter which almost filters out everything is specified. + *

+ * Notice that, the methods of the given {@code consumer} will be called directly in the rpc + * framework's callback thread, so typically you should not do any time consuming work inside + * these methods, otherwise you will be likely to block at least one connection to RS(even more if + * the rpc framework uses NIO). + * @param scan A configured {@link Scan} object. + * @param consumer the consumer used to receive results. + * @param operationConfig the operation configuration for this call, can be null. + */ + void scan(Scan scan, RawScanResultConsumer consumer, @Nullable OperationConfig operationConfig); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 347c85b..23da641 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder; +import org.apache.hadoop.hbase.client.OperationConfig.OperationConfigBuilder; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRpcController; @@ -67,24 +69,33 @@ class RawAsyncTableImpl implements RawAsyncTable { private final long defaultScannerMaxResultSize; - private long readRpcTimeoutNs; + private final OperationConfig defaultOperationConfig; - private long writeRpcTimeoutNs; + private final OperationConfig defaultReadOperationConfig; - private long operationTimeoutNs; + private final OperationConfig defaultWriteOperationConfig; - private long scanTimeoutNs; + private final OperationConfig defaultScanOperationConfig; public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) { this.conn = conn; this.tableName = tableName; - this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs(); - this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs(); - this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() - : conn.connConf.getOperationTimeoutNs(); this.defaultScannerCaching = conn.connConf.getScannerCaching(); this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize(); - this.scanTimeoutNs = conn.connConf.getScanTimeoutNs(); + this.defaultOperationConfig = new OperationConfigBuilder() + .setRpcTimeout(conn.connConf.getRpcTimeoutNs(), TimeUnit.NANOSECONDS) + .setOperationTimeout(tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs() + : conn.connConf.getOperationTimeoutNs(), + TimeUnit.NANOSECONDS) + .setRetryPause(conn.connConf.getPauseNs(), TimeUnit.NANOSECONDS) + .setMaxRetries(conn.connConf.getMaxRetries()) + .setStartLogErrorsCnt(conn.connConf.getStartLogErrorsCnt()).build(); + this.defaultReadOperationConfig = new OperationConfigBuilder(defaultOperationConfig) + .setRpcTimeout(conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS).build(); + this.defaultWriteOperationConfig = new OperationConfigBuilder(defaultOperationConfig) + .setRpcTimeout(conn.connConf.getWriteRpcTimeoutNs(), TimeUnit.NANOSECONDS).build(); + this.defaultScanOperationConfig = new OperationConfigBuilder(defaultReadOperationConfig) + .setOperationTimeout(conn.connConf.getScanTimeoutNs(), TimeUnit.NANOSECONDS).build(); } @Override @@ -97,6 +108,11 @@ class RawAsyncTableImpl implements RawAsyncTable { return conn.getConfiguration(); } + @Override + public OperationConfigBuilder newOperationConfig() { + return new OperationConfigBuilder(defaultOperationConfig); + } + @FunctionalInterface private interface Converter { D convert(I info, S src) throws IOException; @@ -175,19 +191,34 @@ class RawAsyncTableImpl implements RawAsyncTable { (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } - private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { + private SingleRequestCallerBuilder newCaller(byte[] row, OperationConfig operationConfig) { return conn.callerFactory. single().table(tableName).row(row) - .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS); + .operationConfig(operationConfig); + } + + private SingleRequestCallerBuilder newCaller(Row row, OperationConfig operationConfig) { + return newCaller(row.getRow(), operationConfig); + } + + private OperationConfig readConfig(OperationConfig operationConfig) { + return Optional.ofNullable(operationConfig).orElse(defaultReadOperationConfig); + } + + private OperationConfig writeConfig(OperationConfig operationConfig) { + return Optional.ofNullable(operationConfig).orElse(defaultWriteOperationConfig); } - private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { - return newCaller(row.getRow(), rpcTimeoutNs); + private OperationConfig scanConfig(OperationConfig operationConfig) { + return Optional.ofNullable(operationConfig).orElse(defaultScanOperationConfig); + } + + private OperationConfig config(OperationConfig operationConfig) { + return Optional.ofNullable(operationConfig).orElse(defaultOperationConfig); } @Override - public CompletableFuture get(Get get) { - return this. newCaller(get, readRpcTimeoutNs) + public CompletableFuture get(Get get, OperationConfig operationConfig) { + return this. newCaller(get, readConfig(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl . call(controller, loc, stub, get, RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), @@ -196,34 +227,34 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public CompletableFuture put(Put put) { - return this. newCaller(put, writeRpcTimeoutNs) + public CompletableFuture put(Put put, OperationConfig operationConfig) { + return this. newCaller(put, writeConfig(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) .call(); } @Override - public CompletableFuture delete(Delete delete) { - return this. newCaller(delete, writeRpcTimeoutNs) + public CompletableFuture delete(Delete delete, OperationConfig operationConfig) { + return this. newCaller(delete, writeConfig(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(); } @Override - public CompletableFuture append(Append append) { + public CompletableFuture append(Append append, OperationConfig operationConfig) { checkHasFamilies(append); - return this. newCaller(append, writeRpcTimeoutNs) + return this. newCaller(append, config(operationConfig)) .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); } @Override - public CompletableFuture increment(Increment increment) { + public CompletableFuture increment(Increment increment, OperationConfig operationConfig) { checkHasFamilies(increment); - return this. newCaller(increment, writeRpcTimeoutNs) + return this. newCaller(increment, config(operationConfig)) .action((controller, loc, stub) -> this. noncedMutate(controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) .call(); @@ -231,8 +262,8 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture checkAndPut(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Put put) { - return this. newCaller(row, writeRpcTimeoutNs) + CompareOp compareOp, byte[] value, Put put, OperationConfig operationConfig) { + return this. newCaller(row, config(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -243,8 +274,8 @@ class RawAsyncTableImpl implements RawAsyncTable { @Override public CompletableFuture checkAndDelete(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, Delete delete) { - return this. newCaller(row, writeRpcTimeoutNs) + CompareOp compareOp, byte[] value, Delete delete, OperationConfig operationConfig) { + return this. newCaller(row, config(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -297,21 +328,21 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public CompletableFuture mutateRow(RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, + public CompletableFuture mutateRow(RowMutations mutation, OperationConfig operationConfig) { + return this. newCaller(mutation, config(operationConfig)).action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); - }, (resp) -> { + }, resp -> { return null; })).call(); } @Override public CompletableFuture checkAndMutate(byte[] row, byte[] family, byte[] qualifier, - CompareOp compareOp, byte[] value, RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs) + CompareOp compareOp, byte[] value, RowMutations mutation, OperationConfig operationConfig) { + return this. newCaller(mutation, config(operationConfig)) .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -339,7 +370,8 @@ class RawAsyncTableImpl implements RawAsyncTable { } @Override - public CompletableFuture> smallScan(Scan scan, int limit) { + public CompletableFuture> smallScan(Scan scan, int limit, + OperationConfig operationConfig) { if (!scan.isSmall()) { return failedFuture(new IllegalArgumentException("Only small scan is allowed")); } @@ -348,11 +380,10 @@ class RawAsyncTableImpl implements RawAsyncTable { new IllegalArgumentException("Batch and allowPartial is not allowed for small scan")); } return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan)) - .limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS) - .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + .limit(limit).operationConfig(scanConfig(operationConfig)).call(); } - public void scan(Scan scan, RawScanResultConsumer consumer) { + public void scan(Scan scan, RawScanResultConsumer consumer, OperationConfig operationConfig) { if (scan.isSmall()) { if (scan.getBatch() > 0 || scan.getAllowPartialResults()) { consumer.onError( @@ -362,55 +393,14 @@ class RawAsyncTableImpl implements RawAsyncTable { } } scan = setDefaultScanConfig(scan); - new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs) - .start(); - } - - @Override - public void setReadRpcTimeout(long timeout, TimeUnit unit) { - this.readRpcTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getReadRpcTimeout(TimeUnit unit) { - return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setWriteRpcTimeout(long timeout, TimeUnit unit) { - this.writeRpcTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getWriteRpcTimeout(TimeUnit unit) { - return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setOperationTimeout(long timeout, TimeUnit unit) { - this.operationTimeoutNs = unit.toNanos(timeout); - } - - @Override - public long getOperationTimeout(TimeUnit unit) { - return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS); - } - - @Override - public void setScanTimeout(long timeout, TimeUnit unit) { - this.scanTimeoutNs = unit.toNanos(timeout); + new AsyncClientScanner(scan, consumer, tableName, conn, scanConfig(operationConfig)).start(); } @Override - public long getScanTimeout(TimeUnit unit) { - return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit); - } - - @Override - public List> batch(List actions) { + public List> batch(List actions, + OperationConfig operationConfig) { return conn.callerFactory.batch().table(tableName).actions(actions) - .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) - .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS) - .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call(); + .operationConfig(config(operationConfig)).call(); } + } \ No newline at end of file diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 1eec691..609e9a5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -875,10 +875,7 @@ public final class HConstants { /** * timeout for each RPC - * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY} - * instead. */ - @Deprecated public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index 4a391e0..be4466d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.OperationConfig.OperationConfigBuilder; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -60,7 +61,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { private static byte[] VALUE = Bytes.toBytes("value"); - private AsyncConnectionImpl asyncConn; + private AsyncConnectionImpl conn; @BeforeClass public static void setUpBeforeClass() throws Exception { @@ -77,9 +78,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller { @After public void tearDown() { - if (asyncConn != null) { - asyncConn.close(); - asyncConn = null; + if (conn != null) { + conn.close(); + conn = null; } } @@ -88,18 +89,18 @@ public class TestAsyncSingleRequestRpcRetryingCaller { conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt); conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires); - asyncConn = new AsyncConnectionImpl(conf, User.getCurrent()); + conn = new AsyncConnectionImpl(conf, User.getCurrent()); } @Test public void testRegionMove() throws InterruptedException, ExecutionException, IOException { initConn(0, 100, 30); // This will leave a cached entry in location cache - HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + HRegionLocation loc = conn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName()); TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes( TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName())); - RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME); + RawAsyncTable table = conn.getRawTable(TABLE_NAME); table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get(); // move back @@ -115,11 +116,21 @@ public class TestAsyncSingleRequestRpcRetryingCaller { return future; } + private OperationConfigBuilder newOperationConfig() { + return new OperationConfigBuilder() + .setRpcTimeout(conn.connConf.getReadRpcTimeoutNs(), TimeUnit.NANOSECONDS) + .setOperationTimeout(conn.connConf.getOperationTimeoutNs(), TimeUnit.NANOSECONDS) + .setRetryPause(conn.connConf.getPauseNs(), TimeUnit.NANOSECONDS) + .setMaxRetries(conn.connConf.getMaxRetries()) + .setStartLogErrorsCnt(conn.connConf.getStartLogErrorsCnt()); + } + @Test public void testMaxRetries() throws IOException, InterruptedException { initConn(0, 10, 2); try { - asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS) + conn.callerFactory.single().table(TABLE_NAME).row(ROW) + .operationConfig(newOperationConfig().setOperationTimeout(1, TimeUnit.DAYS).build()) .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { @@ -132,9 +143,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller { initConn(0, 100, Integer.MAX_VALUE); long startNs = System.nanoTime(); try { - asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW) - .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture()) - .call().get(); + conn.callerFactory.single().table(TABLE_NAME).row(ROW) + .operationConfig(newOperationConfig().setOperationTimeout(1, TimeUnit.SECONDS).build()) + .action((controller, loc, stub) -> failedFuture()).call().get(); fail(); } catch (ExecutionException e) { assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class)); @@ -149,9 +160,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller { initConn(0, 100, 5); AtomicBoolean errorTriggered = new AtomicBoolean(false); AtomicInteger count = new AtomicInteger(0); - HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); + HRegionLocation loc = conn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get(); AsyncRegionLocator mockedLocator = - new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) { + new AsyncRegionLocator(conn, AsyncConnectionImpl.RETRY_TIMER) { @Override CompletableFuture getRegionLocation(TableName tableName, byte[] row, RegionLocateType locateType, long timeoutNs) { @@ -174,7 +185,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller { } }; try (AsyncConnectionImpl mockedConn = - new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) { + new AsyncConnectionImpl(conn.getConfiguration(), User.getCurrent()) { @Override AsyncRegionLocator getLocator() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java index da8141b..033e23e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java @@ -71,6 +71,8 @@ public class TestAsyncTableGetMultiThreaded { private static AsyncConnection CONN; + private static RawAsyncTable TABLE; + private static byte[][] SPLIT_KEYS; @BeforeClass @@ -88,10 +90,10 @@ public class TestAsyncTableGetMultiThreaded { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()); - CONN.getRawTable(TABLE_NAME) - .putAll( - IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) - .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) + TABLE = CONN.getRawTable(TABLE_NAME); + TABLE.putAll( + IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i))) + .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList())) .get(); } @@ -104,10 +106,8 @@ public class TestAsyncTableGetMultiThreaded { private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException { while (!stop.get()) { int i = ThreadLocalRandom.current().nextInt(COUNT); - assertEquals(i, - Bytes.toInt( - CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() - .getValue(FAMILY, QUALIFIER))); + assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i)))).get() + .getValue(FAMILY, QUALIFIER))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java index 270e3e1..9f3970b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Queue; -import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -122,10 +121,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan { @Override protected List doScan(Scan scan) throws Exception { SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer(); - RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME); - table.setScanTimeout(1, TimeUnit.HOURS); - table.setReadRpcTimeout(1, TimeUnit.HOURS); - table.scan(scan, scanConsumer); + ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer); List results = new ArrayList<>(); for (Result result; (result = scanConsumer.take()) != null;) { results.add(result); -- 1.9.1