From 1697b221047206480112dd322c78cf265787ecf2 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Mon, 18 Feb 2019 16:20:32 +0800 Subject: [PATCH] HBASE-21907 Should set priority for rpc request --- .../AsyncAdminRequestRetryingCaller.java | 6 +- .../client/AsyncBatchRpcRetryingCaller.java | 17 +- .../AsyncMasterRequestRpcRetryingCaller.java | 4 +- .../hbase/client/AsyncRpcRetryingCaller.java | 11 +- .../client/AsyncRpcRetryingCallerFactory.java | 111 +++- ...syncScanSingleRegionRpcRetryingCaller.java | 20 +- .../AsyncServerRequestRpcRetryingCaller.java | 5 +- .../AsyncSingleRequestRpcRetryingCaller.java | 4 +- .../hadoop/hbase/client/ConnectionUtils.java | 34 +- .../org/apache/hadoop/hbase/client/Put.java | 5 + .../hbase/client/RawAsyncHBaseAdmin.java | 133 +++-- .../hbase/client/RawAsyncTableImpl.java | 31 +- .../client/TestAsyncAdminRpcPriority.java | 224 +++++++ .../client/TestAsyncTableRpcPriority.java | 554 ++++++++++++++++++ 14 files changed, 1049 insertions(+), 110 deletions(-) create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java create mode 100644 hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 02e22c052a..ce0fca7eb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -43,11 +43,11 @@ public class AsyncAdminRequestRetryingCaller extends AsyncRpcRetryingCaller callable; private ServerName serverName; - public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, + public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 4051e1d34f..4e983e5a0b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.CellUtil.createCellScanner; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; @@ -45,6 +46,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -129,6 +131,11 @@ class AsyncBatchRpcRetryingCaller { computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); } + + public int getPriority() { + return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) + .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); + } } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, @@ -148,7 +155,12 @@ class AsyncBatchRpcRetryingCaller { this.action2Future = new IdentityHashMap<>(actions.size()); for (int i = 0, n = actions.size(); i < n; i++) { Row rawAction = actions.get(i); - Action action = new Action(rawAction, i); + Action action; + if (rawAction instanceof OperationWithAttributes) { + action = new Action(rawAction, i, ((OperationWithAttributes) rawAction).getPriority()); + } else { + action = new Action(rawAction, i); + } if (rawAction instanceof Append || rawAction instanceof Increment) { action.setNonce(conn.getNonceGenerator().newNonce()); } @@ -341,7 +353,8 @@ class AsyncBatchRpcRetryingCaller { return; } HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(rpcTimeoutNs, remainingNs)); + resetController(controller, Math.min(rpcTimeoutNs, remainingNs), + calcPriority(serverReq.getPriority(), tableName)); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index 7ed44e269b..e5594cbac7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -42,9 +42,9 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, long pauseNs, int maxRetries, long operationTimeoutNs, + Callable callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, + super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index a886b49c9e..45266e9be2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -52,6 +52,8 @@ public abstract class AsyncRpcRetryingCaller { private final Timer retryTimer; + private final int priority; + private final long startNs; private final long pauseNs; @@ -74,10 +76,12 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; - public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, + long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; + this.priority = priority; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; @@ -85,6 +89,7 @@ public abstract class AsyncRpcRetryingCaller { this.startLogErrorsCnt = startLogErrorsCnt; this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); + this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); } @@ -113,7 +118,7 @@ public abstract class AsyncRpcRetryingCaller { } else { callTimeoutNs = rpcTimeoutNs; } - resetController(controller, callTimeoutNs); + resetController(controller, callTimeoutNs, priority); } private void tryScheduleRetry(Throwable error, Consumer updateCachedLocation) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index f019fc460a..70f032c584 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.META_QOS; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; +import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -29,7 +33,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.util.Timer; @@ -43,6 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon @InterfaceAudience.Private class AsyncRpcRetryingCallerFactory { + private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCallerFactory.class); + private final AsyncConnectionImpl conn; private final Timer retryTimer; @@ -77,6 +86,8 @@ class AsyncRpcRetryingCallerFactory { private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority = PRIORITY_UNSET; + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -128,12 +139,25 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AsyncSingleRequestRpcRetryingCaller build() { + public SingleRequestCallerBuilder priority(int priority) { + this.priority = priority; + return this; + } + + private void preCheck() { checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); - return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId, - checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(tableName, "tableName is null"); + checkNotNull(row, "row is null"); + checkNotNull(locateType, "locateType is null"); + checkNotNull(callable, "action is null"); + this.priority = calcPriority(priority, tableName); + } + + public AsyncSingleRequestRpcRetryingCaller build() { + preCheck(); + return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, + locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -175,6 +199,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs; + private int priority = PRIORITY_UNSET; + public ScanSingleRegionCallerBuilder id(long scannerId) { this.scannerId = scannerId; return this; @@ -182,6 +208,7 @@ class AsyncRpcRetryingCallerFactory { public ScanSingleRegionCallerBuilder setScan(Scan scan) { this.scan = scan; + this.priority = scan.getPriority(); return this; } @@ -246,14 +273,22 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AsyncScanSingleRegionRpcRetryingCaller build() { + private void preCheck() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); - return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, - checkNotNull(scan, "scan is null"), scanMetrics, scannerId, - checkNotNull(resultCache, "resultCache is null"), - checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, - pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(scan, "scan is null"); + checkNotNull(resultCache, "resultCache is null"); + checkNotNull(consumer, "consumer is null"); + checkNotNull(stub, "stub is null"); + checkNotNull(loc, "location is null"); + this.priority = calcPriority(priority, loc.getRegion().getTable()); + } + + public AsyncScanSingleRegionRpcRetryingCaller build() { + preCheck(); + return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, + scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, + scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -338,6 +373,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; + private int priority = PRIORITY_UNSET; + public MasterRequestCallerBuilder action( AsyncMasterRequestRpcRetryingCaller.Callable callable) { this.callable = callable; @@ -369,10 +406,43 @@ class AsyncRpcRetryingCallerFactory { return this; } + public MasterRequestCallerBuilder priority(TableName tableName) { + this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); + return this; + } + + public MasterRequestCallerBuilder priority(byte[] regionNameOrEncodedRegionName) { + try { + if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { + if (Bytes.equals(regionNameOrEncodedRegionName, + RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { + this.priority = Math.max(priority, META_QOS); + } + } else { + this.priority = Math.max(priority, + ConnectionUtils.getPriority(RegionInfo.getTable(regionNameOrEncodedRegionName))); + } + } catch (IOException e) { + // not a big deal but let's warn it + LOG.warn("Failed to parse region name {}", + Bytes.toStringBinary(regionNameOrEncodedRegionName), e); + } + return this; + } + + public MasterRequestCallerBuilder priority(int priority) { + this.priority = Math.max(this.priority, priority); + return this; + } + + private void preCheck() { + checkNotNull(callable, "action is null"); + } + public AsyncMasterRequestRpcRetryingCaller build() { - return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, - checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + preCheck(); + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, + pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -398,6 +468,8 @@ class AsyncRpcRetryingCallerFactory { private ServerName serverName; + private int priority; + public AdminRequestCallerBuilder action( AsyncAdminRequestRetryingCaller.Callable callable) { this.callable = callable; @@ -434,9 +506,14 @@ class AsyncRpcRetryingCallerFactory { return this; } + public AdminRequestCallerBuilder priority(int priority) { + this.priority = priority; + return this; + } + public AsyncAdminRequestRetryingCaller build() { - return new AsyncAdminRequestRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 584bfacf76..96961afc65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -91,6 +91,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final boolean regionServerRemote; + private final int priority; + private final long scannerLeaseTimeoutPeriodNs; private final long pauseNs; @@ -298,11 +300,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } - public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, - AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, - ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, - HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, - long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, + Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, + AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, + boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scanMetrics = scanMetrics; @@ -324,7 +326,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; } this.future = new CompletableFuture<>(); + this.priority = priority; this.controller = conn.rpcControllerFactory.newController(); + this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); } @@ -338,7 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); - resetController(controller, rpcTimeoutNs); + resetController(controller, rpcTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { @@ -558,7 +562,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { if (tries > 1) { incRPCRetriesMetrics(scanMetrics, regionServerRemote); } - resetController(controller, callTimeoutNs); + resetController(controller, callTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, false, false, scan.getLimit()); stub.scan(controller, req, resp -> onComplete(controller, resp)); @@ -575,7 +579,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void renewLease() { incRPCCallsMetrics(scanMetrics, regionServerRemote); nextCallSeq++; - resetController(controller, rpcTimeoutNs); + resetController(controller, rpcTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); stub.scan(controller, req, resp -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index f114eff5ba..63c85c210e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; @@ -47,8 +48,8 @@ public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCall public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 9490d0f0b6..9b0dede5ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,9 +56,9 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, long pauseNs, int maxAttempts, long operationTimeoutNs, + Callable callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 3b6560fd10..2c5385406e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -298,12 +298,13 @@ public final class ConnectionUtils { return Bytes.equals(row, EMPTY_END_ROW); } - static void resetController(HBaseRpcController controller, long timeoutNs) { + static void resetController(HBaseRpcController controller, long timeoutNs, int priority) { controller.reset(); if (timeoutNs >= 0) { controller.setCallTimeout( (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); } + controller.setPriority(priority); } static Throwable translateException(Throwable t) { @@ -588,4 +589,35 @@ public final class ConnectionUtils { } } } + + /** + * Select the priority for the rpc call. + *

+ * The rules are: + *

    + *
  1. If user set a priority explicitly, then just use it.
  2. + *
  3. For meta table, use {@link HConstants#META_QOS}.
  4. + *
  5. For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  6. + *
  7. For other tables, use {@link HConstants#NORMAL_QOS}.
  8. + *
+ * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. + * @param tableName the table we operate on + */ + static int calcPriority(int priority, TableName tableName) { + if (priority != HConstants.PRIORITY_UNSET) { + return priority; + } else { + return getPriority(tableName); + } + } + + static int getPriority(TableName tableName) { + if (TableName.isMetaTableName(tableName)) { + return HConstants.META_QOS; + } else if (tableName.isSystemTable()) { + return HConstants.SYSTEMTABLE_QOS; + } else { + return HConstants.NORMAL_QOS; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java index db8eec51e0..702717038c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java @@ -339,4 +339,9 @@ public class Put extends Mutation implements HeapSize { public Put setTTL(long ttl) { return (Put) super.setTTL(ttl); } + + @Override + public Put setPriority(int priority) { + return (Put) super.setPriority(priority); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d4b60fb8a3..52986c2e8f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; @@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -393,7 +395,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture adminCall(HBaseRpcController controller, AdminService.Interface stub, PREQ preq, AdminRpcCall rpcCall, Converter respConverter) { - CompletableFuture future = new CompletableFuture<>(); rpcCall.call(stub, controller, preq, new RpcCallback() { @@ -416,9 +417,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture procedureCall(PREQ preq, MasterRpcCall rpcCall, Converter respConverter, ProcedureBiConsumer consumer) { - CompletableFuture procFuture = - this. newMasterCaller().action((controller, stub) -> this - . call(controller, stub, preq, rpcCall, respConverter)).call(); + return procedureCall(b -> { + }, preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall(TableName tableName, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall(byte[] regionName, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + return procedureCall(b -> b.priority(regionName), preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall( + Consumer> prioritySetter, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + MasterRequestCallerBuilder builder = this. newMasterCaller().action((controller, + stub) -> this. call(controller, stub, preq, rpcCall, respConverter)); + prioritySetter.accept(builder); + CompletableFuture procFuture = builder.call(); CompletableFuture future = waitProcedureResult(procFuture); addListener(future, consumer); return future; @@ -515,7 +537,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture getDescriptor(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); - addListener(this.> newMasterCaller() + addListener(this.> newMasterCaller().priority(tableName) .action((controller, stub) -> this .> call( controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), @@ -566,14 +588,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture createTable(TableName tableName, CreateTableRequest request) { Preconditions.checkNotNull(tableName, "table name is null"); - return this. procedureCall(request, + return this. procedureCall(tableName, request, (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), new CreateTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture modifyTable(TableDescriptor desc) { - return this. procedureCall( + return this. procedureCall(desc.getTableName(), RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); @@ -581,15 +603,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture deleteTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), new DeleteTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture truncateTable(TableName tableName, boolean preserveSplits) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); @@ -597,16 +619,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture enableTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), new EnableTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture disableTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), new DisableTableProcedureBiConsumer(tableName)); } @@ -725,7 +747,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), new AddColumnFamilyProcedureBiConsumer(tableName)); @@ -733,7 +755,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture deleteColumnFamily(TableName tableName, byte[] columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); @@ -742,7 +764,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture modifyColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); @@ -1226,9 +1248,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } addListener( - this. procedureCall(request, - (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), - new MergeTableRegionProcedureBiConsumer(tableName)), + this. procedureCall(nameOfRegionA, + request, (s, c, req, done) -> s.mergeTableRegions(c, req, done), + (resp) -> resp.getProcId(), new MergeTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); @@ -1403,9 +1425,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - addListener(this. procedureCall(request, - (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), - new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { + addListener( + this. procedureCall(hri.getTable(), + request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), + new SplitTableRegionProcedureBiConsumer(tableName)), + (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -1423,7 +1447,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(err); return; } - addListener(this. newMasterCaller() + addListener(this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this. call( controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) @@ -1447,7 +1471,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - this. newMasterCaller() + this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this . call(controller, stub, RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), @@ -1473,7 +1497,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - this. newMasterCaller() + this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this . call(controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), @@ -1499,7 +1523,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - moveRegion( + moveRegion(regionName, RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), (ret, err2) -> { if (err2 != null) { @@ -1522,8 +1546,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(err); return; } - addListener(moveRegion(RequestConverter - .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), + addListener( + moveRegion(regionName, RequestConverter + .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); @@ -1535,12 +1560,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture moveRegion(MoveRegionRequest request) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call(); + private CompletableFuture moveRegion(byte[] regionName, MoveRegionRequest request) { + return this. newMasterCaller().priority(regionName) + .action( + (controller, stub) -> this. call(controller, + stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) + .call(); } @Override @@ -2704,35 +2729,31 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture shutdown() { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, ShutdownRequest.newBuilder().build(), - (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call(); + return this. newMasterCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. call(controller, + stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), + resp -> null)) + .call(); } @Override public CompletableFuture stopMaster() { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, StopMasterRequest.newBuilder().build(), - (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call(); + return this. newMasterCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. call( + controller, stub, StopMasterRequest.newBuilder().build(), + (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) + .call(); } @Override public CompletableFuture stopRegionServer(ServerName serverName) { - StopServerRequest request = - RequestConverter.buildStopServerRequest("Called by admin client " - + this.connection.toString()); - return this - . newAdminCaller() - .action( - (controller, stub) -> this. adminCall( - controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), - resp -> null)).serverName(serverName).call(); + StopServerRequest request = RequestConverter + .buildStopServerRequest("Called by admin client " + this.connection.toString()); + return this. newAdminCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. adminCall( + controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), + resp -> null)) + .serverName(serverName).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 96fa85d67c..789460ccd2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -206,20 +206,21 @@ class RawAsyncTableImpl implements AsyncTable { (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } - private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { - return conn.callerFactory. single().table(tableName).row(row) + private SingleRequestCallerBuilder newCaller(byte[] row, int priority, long rpcTimeoutNs) { + return conn.callerFactory. single().table(tableName).row(row).priority(priority) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt); } - private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { - return newCaller(row.getRow(), rpcTimeoutNs); + private SingleRequestCallerBuilder newCaller( + R row, long rpcTimeoutNs) { + return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); } private CompletableFuture get(Get get, int replicaId) { - return this. newCaller(get, readRpcTimeoutNs) + return this. newCaller(get, readRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl . call(controller, loc, stub, get, RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), @@ -237,7 +238,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture put(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); - return this. newCaller(put, writeRpcTimeoutNs) + return this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) .call(); @@ -245,7 +246,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture delete(Delete delete) { - return this. newCaller(delete, writeRpcTimeoutNs) + return this. newCaller(delete, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(); @@ -256,7 +257,7 @@ class RawAsyncTableImpl implements AsyncTable { checkHasFamilies(append); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(append, rpcTimeoutNs) + return this. newCaller(append, rpcTimeoutNs) .action( (controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) @@ -268,7 +269,7 @@ class RawAsyncTableImpl implements AsyncTable { checkHasFamilies(increment); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(increment, rpcTimeoutNs) + return this. newCaller(increment, rpcTimeoutNs) .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) @@ -330,7 +331,7 @@ class RawAsyncTableImpl implements AsyncTable { public CompletableFuture thenPut(Put put) { validatePut(put, conn.connConf.getMaxKeyValueSize()); preCheck(); - return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -342,7 +343,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); - return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -354,7 +355,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); - return RawAsyncTableImpl.this. newCaller(mutation, rpcTimeoutNs) + return RawAsyncTableImpl.this + . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -412,8 +414,9 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture mutateRow(RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, - stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { + return this. newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, + mutation, (rn, rm) -> { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java new file mode 100644 index 0000000000..db00d89e83 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; +import static org.apache.hadoop.hbase.HConstants.META_QOS; +import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; +import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; +import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.Interface; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ShutdownResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; + +/** + * Confirm that we will set the priority in {@link HBaseRpcController} for several admin operations. + */ +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncAdminRpcPriority { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncAdminRpcPriority.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private MasterService.Interface masterStub; + + private AdminService.Interface adminStub; + + private AsyncConnection conn; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws IOException { + masterStub = mock(MasterService.Interface.class); + adminStub = mock(AdminService.Interface.class); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(GetProcedureResultResponse.newBuilder() + .setState(GetProcedureResultResponse.State.FINISHED).build()); + return null; + } + }).when(masterStub).getProcedureResult(any(HBaseRpcController.class), + any(GetProcedureResultRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(CreateTableResponse.newBuilder().setProcId(1L).build()); + return null; + } + }).when(masterStub).createTable(any(HBaseRpcController.class), any(CreateTableRequest.class), + any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(ShutdownResponse.getDefaultInstance()); + return null; + } + }).when(masterStub).shutdown(any(HBaseRpcController.class), any(ShutdownRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(StopMasterResponse.getDefaultInstance()); + return null; + } + }).when(masterStub).stopMaster(any(HBaseRpcController.class), any(StopMasterRequest.class), + any()); + + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(StopServerResponse.getDefaultInstance()); + return null; + } + }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class), + any()); + + conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", + UserProvider.instantiate(CONF).getCurrent()) { + + @Override + CompletableFuture getMasterStub() { + return CompletableFuture.completedFuture(masterStub); + } + + @Override + Interface getAdminStub(ServerName serverName) throws IOException { + return adminStub; + } + }; + } + + private HBaseRpcController assertPriority(int priority) { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(HBaseRpcController controller) { + return controller.getPriority() == priority; + } + }); + } + + @Test + public void testCreateNormalTable() { + conn.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()) + .join(); + verify(masterStub, times(1)).createTable(assertPriority(NORMAL_QOS), + any(CreateTableRequest.class), any()); + } + + // a bit strange as we can not do this in production but anyway, just a client mock to confirm + // that we pass the correct priority + @Test + public void testCreateSystemTable() { + conn.getAdmin() + .createTable(TableDescriptorBuilder + .newBuilder(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()) + .join(); + verify(masterStub, times(1)).createTable(assertPriority(SYSTEMTABLE_QOS), + any(CreateTableRequest.class), any()); + } + + // a bit strange as we can not do this in production but anyway, just a client mock to confirm + // that we pass the correct priority + @Test + public void testCreateMetaTable() { + conn.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TableName.META_TABLE_NAME) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).build()).join(); + verify(masterStub, times(1)).createTable(assertPriority(META_QOS), + any(CreateTableRequest.class), any()); + } + + @Test + public void testShutdown() { + conn.getAdmin().shutdown().join(); + verify(masterStub, times(1)).shutdown(assertPriority(HIGH_QOS), any(ShutdownRequest.class), + any()); + } + + @Test + public void testStopMaster() { + conn.getAdmin().stopMaster().join(); + verify(masterStub, times(1)).stopMaster(assertPriority(HIGH_QOS), any(StopMasterRequest.class), + any()); + } + + @Test + public void testStopRegionServer() { + conn.getAdmin().stopRegionServer(ServerName.valueOf("rs", 16010, 12345)).join(); + verify(adminStub, times(1)).stopServer(assertPriority(HIGH_QOS), any(StopServerRequest.class), + any()); + } +} diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java new file mode 100644 index 0000000000..c195812d14 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -0,0 +1,554 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.HConstants.META_QOS; +import static org.apache.hadoop.hbase.HConstants.NORMAL_QOS; +import static org.apache.hadoop.hbase.HConstants.SYSTEMTABLE_QOS; +import static org.apache.hadoop.hbase.NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR; +import static org.junit.Assert.assertNotNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.Cell.Type; +import org.apache.hadoop.hbase.CellBuilderFactory; +import org.apache.hadoop.hbase.CellBuilderType; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.ArgumentMatcher; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionActionResult; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ResultOrException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; + +/** + * Confirm that we will set the priority in {@link HBaseRpcController} for several table operations. + */ +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncTableRpcPriority { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncTableRpcPriority.class); + + private static Configuration CONF = HBaseConfiguration.create(); + + private ClientService.Interface stub; + + private AsyncConnection conn; + + @Rule + public TestName name = new TestName(); + + @Before + public void setUp() throws IOException { + stub = mock(ClientService.Interface.class); + AtomicInteger scanNextCalled = new AtomicInteger(0); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ScanRequest req = invocation.getArgument(1); + RpcCallback done = invocation.getArgument(2); + if (!req.hasScannerId()) { + done.run(ScanResponse.newBuilder().setScannerId(1).setTtl(800) + .setMoreResultsInRegion(true).setMoreResults(true).build()); + } else { + if (req.hasCloseScanner() && req.getCloseScanner()) { + done.run(ScanResponse.getDefaultInstance()); + } else { + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) + .setRow(Bytes.toBytes(scanNextCalled.incrementAndGet())) + .setFamily(Bytes.toBytes("cf")).setQualifier(Bytes.toBytes("cq")) + .setValue(Bytes.toBytes("v")).build(); + Result result = Result.create(Arrays.asList(cell)); + done.run( + ScanResponse.newBuilder().setScannerId(1).setTtl(800).setMoreResultsInRegion(true) + .setMoreResults(true).addResults(ProtobufUtil.toResult(result)).build()); + } + } + return null; + } + }).when(stub).scan(any(HBaseRpcController.class), any(ScanRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + ClientProtos.MultiResponse resp = + ClientProtos.MultiResponse.newBuilder() + .addRegionActionResult(RegionActionResult.newBuilder().addResultOrException( + ResultOrException.newBuilder().setResult(ProtobufUtil.toResult(new Result())))) + .build(); + RpcCallback done = invocation.getArgument(2); + done.run(resp); + return null; + } + }).when(stub).multi(any(HBaseRpcController.class), any(ClientProtos.MultiRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + MutationProto req = ((MutateRequest) invocation.getArgument(1)).getMutation(); + MutateResponse resp; + switch (req.getMutateType()) { + case INCREMENT: + ColumnValue value = req.getColumnValue(0); + QualifierValue qvalue = value.getQualifierValue(0); + Cell cell = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setType(Type.Put) + .setRow(req.getRow().toByteArray()).setFamily(value.getFamily().toByteArray()) + .setQualifier(qvalue.getQualifier().toByteArray()) + .setValue(qvalue.getValue().toByteArray()).build(); + resp = MutateResponse.newBuilder() + .setResult(ProtobufUtil.toResult(Result.create(Arrays.asList(cell)))).build(); + break; + default: + resp = MutateResponse.getDefaultInstance(); + break; + } + RpcCallback done = invocation.getArgument(2); + done.run(resp); + return null; + } + }).when(stub).mutate(any(HBaseRpcController.class), any(MutateRequest.class), any()); + doAnswer(new Answer() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + RpcCallback done = invocation.getArgument(2); + done.run(GetResponse.getDefaultInstance()); + return null; + } + }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); + conn = new AsyncConnectionImpl(CONF, new DoNothingAsyncRegistry(CONF), "test", + UserProvider.instantiate(CONF).getCurrent()) { + + @Override + AsyncRegionLocator getLocator() { + AsyncRegionLocator locator = mock(AsyncRegionLocator.class); + Answer> answer = + new Answer>() { + + @Override + public CompletableFuture answer(InvocationOnMock invocation) + throws Throwable { + TableName tableName = invocation.getArgument(0); + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ServerName serverName = ServerName.valueOf("rs", 16010, 12345); + HRegionLocation loc = new HRegionLocation(info, serverName); + return CompletableFuture.completedFuture(loc); + } + }; + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + any(RegionLocateType.class), anyLong()); + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + anyInt(), any(RegionLocateType.class), anyLong()); + return locator; + } + + @Override + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + return stub; + } + }; + } + + private HBaseRpcController assertPriority(int priority) { + return argThat(new ArgumentMatcher() { + + @Override + public boolean matches(HBaseRpcController controller) { + return controller.getPriority() == priority; + } + }); + } + + @Test + public void testGet() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .get(new Get(Bytes.toBytes(0)).setPriority(11)).join(); + verify(stub, times(1)).get(assertPriority(11), any(GetRequest.class), any()); + } + + @Test + public void testGetNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())).get(new Get(Bytes.toBytes(0))).join(); + verify(stub, times(1)).get(assertPriority(NORMAL_QOS), any(GetRequest.class), any()); + } + + @Test + public void testGetSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .get(new Get(Bytes.toBytes(0))).join(); + verify(stub, times(1)).get(assertPriority(SYSTEMTABLE_QOS), any(GetRequest.class), any()); + } + + @Test + public void testGetMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).get(new Get(Bytes.toBytes(0))).join(); + verify(stub, times(1)).get(assertPriority(META_QOS), any(GetRequest.class), any()); + } + + @Test + public void testPut() { + conn + .getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) + .setPriority(12).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(12), any(MutateRequest.class), any()); + } + + @Test + public void testPutNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())).put(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testPutSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .put(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testPutMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).put(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testDelete() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .delete(new Delete(Bytes.toBytes(0)).setPriority(13)).join(); + verify(stub, times(1)).mutate(assertPriority(13), any(MutateRequest.class), any()); + } + + @Test + public void testDeleteNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())).delete(new Delete(Bytes.toBytes(0))) + .join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testDeleteSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .delete(new Delete(Bytes.toBytes(0))).join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testDeleteMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).delete(new Delete(Bytes.toBytes(0))).join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testAppend() { + conn + .getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) + .setPriority(14).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(14), any(MutateRequest.class), any()); + } + + @Test + public void testAppendNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())).append(new Append(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testAppendSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .append(new Append(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), + Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testAppendMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).append(new Append(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))).join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testIncrement() { + conn.getTable(TableName.valueOf(name.getMethodName())).increment(new Increment(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).setPriority(15)).join(); + verify(stub, times(1)).mutate(assertPriority(15), any(MutateRequest.class), any()); + } + + @Test + public void testIncrementNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testIncrementSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testIncrementMetaTable() { + conn.getTable(TableName.META_TABLE_NAME) + .incrementColumnValue(Bytes.toBytes(0), Bytes.toBytes("cf"), Bytes.toBytes("cq"), 1).join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndPut() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifNotExists() + .thenPut(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v")).setPriority(16)) + .join(); + verify(stub, times(1)).mutate(assertPriority(16), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndPutNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndPutSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifNotExists().thenPut(new Put(Bytes.toBytes(0)).addColumn(Bytes.toBytes("cf"), + Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndPutMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) + .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndDelete() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0)).setPriority(17)).join(); + verify(stub, times(1)).mutate(assertPriority(17), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndDeleteNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); + verify(stub, times(1)).mutate(assertPriority(NORMAL_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndDeleteSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")).thenDelete(new Delete(Bytes.toBytes(0))).join(); + verify(stub, times(1)).mutate(assertPriority(SYSTEMTABLE_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndDeleteMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) + .qualifier(Bytes.toBytes("cq")).ifNotExists().thenPut(new Put(Bytes.toBytes(0)) + .addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cq"), Bytes.toBytes("v"))) + .join(); + verify(stub, times(1)).mutate(assertPriority(META_QOS), any(MutateRequest.class), any()); + } + + @Test + public void testCheckAndMutate() throws IOException { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")).thenMutate(new RowMutations(Bytes.toBytes(0)) + .add((Mutation) new Delete(Bytes.toBytes(0)).setPriority(18))) + .join(); + verify(stub, times(1)).multi(assertPriority(18), any(ClientProtos.MultiRequest.class), any()); + } + + @Test + public void testCheckAndMutateNormalTable() throws IOException { + conn.getTable(TableName.valueOf(name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")) + .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) + .join(); + verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), + any()); + } + + @Test + public void testCheckAndMutateSystemTable() throws IOException { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")).qualifier(Bytes.toBytes("cq")) + .ifEquals(Bytes.toBytes("v")) + .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) + .join(); + verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), + any(ClientProtos.MultiRequest.class), any()); + } + + @Test + public void testCheckAndMutateMetaTable() throws IOException { + conn.getTable(TableName.META_TABLE_NAME).checkAndMutate(Bytes.toBytes(0), Bytes.toBytes("cf")) + .qualifier(Bytes.toBytes("cq")).ifEquals(Bytes.toBytes("v")) + .thenMutate(new RowMutations(Bytes.toBytes(0)).add((Mutation) new Delete(Bytes.toBytes(0)))) + .join(); + verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class), + any()); + } + + @Test + public void testScan() throws IOException, InterruptedException { + try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) + .getScanner(new Scan().setCaching(1).setMaxResultSize(1).setPriority(19))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); + } + Thread.sleep(1000); + // open, next, several renew lease, and then close + verify(stub, atLeast(4)).scan(assertPriority(19), any(ScanRequest.class), any()); + } + + @Test + public void testScanNormalTable() throws IOException, InterruptedException { + try (ResultScanner scanner = conn.getTable(TableName.valueOf(name.getMethodName())) + .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); + } + Thread.sleep(1000); + // open, next, several renew lease, and then close + verify(stub, atLeast(4)).scan(assertPriority(NORMAL_QOS), any(ScanRequest.class), any()); + } + + @Test + public void testScanSystemTable() throws IOException, InterruptedException { + try (ResultScanner scanner = + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); + } + Thread.sleep(1000); + // open, next, several renew lease, and then close + verify(stub, atLeast(4)).scan(assertPriority(SYSTEMTABLE_QOS), any(ScanRequest.class), any()); + } + + @Test + public void testScanMetaTable() throws IOException, InterruptedException { + try (ResultScanner scanner = conn.getTable(TableName.META_TABLE_NAME) + .getScanner(new Scan().setCaching(1).setMaxResultSize(1))) { + assertNotNull(scanner.next()); + Thread.sleep(1000); + } + Thread.sleep(1000); + // open, next, several renew lease, and then close + verify(stub, atLeast(4)).scan(assertPriority(META_QOS), any(ScanRequest.class), any()); + } + + @Test + public void testBatchNormalTable() { + conn.getTable(TableName.valueOf(name.getMethodName())) + .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + verify(stub, times(1)).multi(assertPriority(NORMAL_QOS), any(ClientProtos.MultiRequest.class), + any()); + } + + @Test + public void testBatchSystemTable() { + conn.getTable(TableName.valueOf(SYSTEM_NAMESPACE_NAME_STR, name.getMethodName())) + .batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join(); + verify(stub, times(1)).multi(assertPriority(SYSTEMTABLE_QOS), + any(ClientProtos.MultiRequest.class), any()); + } + + @Test + public void testBatchMetaTable() { + conn.getTable(TableName.META_TABLE_NAME).batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))) + .join(); + verify(stub, times(1)).multi(assertPriority(META_QOS), any(ClientProtos.MultiRequest.class), + any()); + } +} -- 2.17.1