From e7bcd64c6810570be6bea05accd5feeec8cd05b4 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Sat, 16 Feb 2019 15:10:13 +0800 Subject: [PATCH] HBASE-21907 Should set priority for rpc request --- .../AsyncAdminRequestRetryingCaller.java | 6 +- .../client/AsyncBatchRpcRetryingCaller.java | 10 +- .../AsyncMasterRequestRpcRetryingCaller.java | 4 +- .../hbase/client/AsyncRpcRetryingCaller.java | 11 +- .../client/AsyncRpcRetryingCallerFactory.java | 111 ++++++++++++--- ...syncScanSingleRegionRpcRetryingCaller.java | 20 +-- .../AsyncServerRequestRpcRetryingCaller.java | 5 +- .../AsyncSingleRequestRpcRetryingCaller.java | 4 +- .../hadoop/hbase/client/ConnectionUtils.java | 34 ++++- .../hbase/client/RawAsyncHBaseAdmin.java | 133 ++++++++++-------- .../hbase/client/RawAsyncTableImpl.java | 29 ++-- 11 files changed, 259 insertions(+), 108 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java index 02e22c052a..ce0fca7eb0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java @@ -43,11 +43,11 @@ public class AsyncAdminRequestRetryingCaller extends AsyncRpcRetryingCaller callable; private ServerName serverName; - public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, + public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java index 4051e1d34f..78bfa0c7fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.CellUtil.createCellScanner; import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS; import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime; +import static org.apache.hadoop.hbase.client.ConnectionUtils.*; import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController; import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException; import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent; @@ -45,6 +46,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.hadoop.hbase.CellScannable; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -129,6 +131,11 @@ class AsyncBatchRpcRetryingCaller { computeIfAbsent(actionsByRegion, loc.getRegion().getRegionName(), () -> new RegionRequest(loc)).actions.add(action); } + + public int getPriority() { + return actionsByRegion.values().stream().flatMap(rr -> rr.actions.stream()) + .mapToInt(Action::getPriority).max().orElse(HConstants.PRIORITY_UNSET); + } } public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, @@ -341,7 +348,8 @@ class AsyncBatchRpcRetryingCaller { return; } HBaseRpcController controller = conn.rpcControllerFactory.newController(); - resetController(controller, Math.min(rpcTimeoutNs, remainingNs)); + resetController(controller, Math.min(rpcTimeoutNs, remainingNs), + calculatePriority(serverReq.getPriority(), tableName)); if (!cells.isEmpty()) { controller.setCellScanner(createCellScanner(cells)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java index 7ed44e269b..e5594cbac7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java @@ -42,9 +42,9 @@ public class AsyncMasterRequestRpcRetryingCaller extends AsyncRpcRetryingCall private final Callable callable; public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, - Callable callable, long pauseNs, int maxRetries, long operationTimeoutNs, + Callable callable, int priority, long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, + super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java index a886b49c9e..45266e9be2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java @@ -52,6 +52,8 @@ public abstract class AsyncRpcRetryingCaller { private final Timer retryTimer; + private final int priority; + private final long startNs; private final long pauseNs; @@ -74,10 +76,12 @@ public abstract class AsyncRpcRetryingCaller { protected final HBaseRpcController controller; - public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, - int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, int priority, + long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, + int startLogErrorsCnt) { this.retryTimer = retryTimer; this.conn = conn; + this.priority = priority; this.pauseNs = pauseNs; this.maxAttempts = maxAttempts; this.operationTimeoutNs = operationTimeoutNs; @@ -85,6 +89,7 @@ public abstract class AsyncRpcRetryingCaller { this.startLogErrorsCnt = startLogErrorsCnt; this.future = new CompletableFuture<>(); this.controller = conn.rpcControllerFactory.newController(); + this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); this.startNs = System.nanoTime(); } @@ -113,7 +118,7 @@ public abstract class AsyncRpcRetryingCaller { } else { callTimeoutNs = rpcTimeoutNs; } - resetController(controller, callTimeoutNs); + resetController(controller, callTimeoutNs, priority); } private void tryScheduleRetry(Throwable error, Consumer updateCachedLocation) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java index f019fc460a..9914009f93 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.client; +import static org.apache.hadoop.hbase.HConstants.META_QOS; +import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET; +import static org.apache.hadoop.hbase.client.ConnectionUtils.calculatePriority; import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument; import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull; +import java.io.IOException; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -29,7 +33,10 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.io.netty.util.Timer; @@ -43,6 +50,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon @InterfaceAudience.Private class AsyncRpcRetryingCallerFactory { + private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCallerFactory.class); + private final AsyncConnectionImpl conn; private final Timer retryTimer; @@ -77,6 +86,8 @@ class AsyncRpcRetryingCallerFactory { private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID; + private int priority = PRIORITY_UNSET; + public SingleRequestCallerBuilder table(TableName tableName) { this.tableName = tableName; return this; @@ -128,12 +139,25 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AsyncSingleRequestRpcRetryingCaller build() { + public SingleRequestCallerBuilder priority(OperationWithAttributes op) { + this.priority = op.getPriority(); + return this; + } + + private void preCheck() { checkArgument(replicaId >= 0, "invalid replica id %s", replicaId); - return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, - checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"), replicaId, - checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"), - pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(tableName, "tableName is null"); + checkNotNull(row, "row is null"); + checkNotNull(locateType, "locateType is null"); + checkNotNull(callable, "action is null"); + this.priority = calculatePriority(priority, tableName); + } + + public AsyncSingleRequestRpcRetryingCaller build() { + preCheck(); + return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn, tableName, row, replicaId, + locateType, callable, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -175,6 +199,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs; + private int priority = PRIORITY_UNSET; + public ScanSingleRegionCallerBuilder id(long scannerId) { this.scannerId = scannerId; return this; @@ -182,6 +208,7 @@ class AsyncRpcRetryingCallerFactory { public ScanSingleRegionCallerBuilder setScan(Scan scan) { this.scan = scan; + this.priority = scan.getPriority(); return this; } @@ -246,14 +273,22 @@ class AsyncRpcRetryingCallerFactory { return this; } - public AsyncScanSingleRegionRpcRetryingCaller build() { + private void preCheck() { checkArgument(scannerId != null, "invalid scannerId %d", scannerId); - return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, - checkNotNull(scan, "scan is null"), scanMetrics, scannerId, - checkNotNull(resultCache, "resultCache is null"), - checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"), - checkNotNull(loc, "location is null"), isRegionServerRemote, scannerLeaseTimeoutPeriodNs, - pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); + checkNotNull(scan, "scan is null"); + checkNotNull(resultCache, "resultCache is null"); + checkNotNull(consumer, "consumer is null"); + checkNotNull(stub, "stub is null"); + checkNotNull(loc, "location is null"); + this.priority = calculatePriority(priority, loc.getRegion().getTable()); + } + + public AsyncScanSingleRegionRpcRetryingCaller build() { + preCheck(); + return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn, scan, scanMetrics, + scannerId, resultCache, consumer, stub, loc, isRegionServerRemote, priority, + scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs, + startLogErrorsCnt); } /** @@ -338,6 +373,8 @@ class AsyncRpcRetryingCallerFactory { private long rpcTimeoutNs = -1L; + private int priority = PRIORITY_UNSET; + public MasterRequestCallerBuilder action( AsyncMasterRequestRpcRetryingCaller.Callable callable) { this.callable = callable; @@ -369,10 +406,43 @@ class AsyncRpcRetryingCallerFactory { return this; } + public MasterRequestCallerBuilder priority(TableName tableName) { + this.priority = Math.max(priority, ConnectionUtils.getPriority(tableName)); + return this; + } + + public MasterRequestCallerBuilder priority(byte[] regionNameOrEncodedRegionName) { + try { + if (RegionInfo.isEncodedRegionName(regionNameOrEncodedRegionName)) { + if (Bytes.equals(regionNameOrEncodedRegionName, + RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes())) { + this.priority = Math.max(priority, META_QOS); + } + } else { + this.priority = Math.max(priority, + ConnectionUtils.getPriority(RegionInfo.getTable(regionNameOrEncodedRegionName))); + } + } catch (IOException e) { + // not a big deal but let's warn it + LOG.warn("Failed to parse region name {}", + Bytes.toStringBinary(regionNameOrEncodedRegionName), e); + } + return this; + } + + public MasterRequestCallerBuilder priority(int priority) { + this.priority = Math.max(this.priority, priority); + return this; + } + + private void preCheck() { + checkNotNull(callable, "action is null"); + } + public AsyncMasterRequestRpcRetryingCaller build() { - return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, - checkNotNull(callable, "action is null"), pauseNs, maxAttempts, operationTimeoutNs, - rpcTimeoutNs, startLogErrorsCnt); + preCheck(); + return new AsyncMasterRequestRpcRetryingCaller(retryTimer, conn, callable, priority, + pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); } /** @@ -398,6 +468,8 @@ class AsyncRpcRetryingCallerFactory { private ServerName serverName; + private int priority; + public AdminRequestCallerBuilder action( AsyncAdminRequestRetryingCaller.Callable callable) { this.callable = callable; @@ -434,9 +506,14 @@ class AsyncRpcRetryingCallerFactory { return this; } + public AdminRequestCallerBuilder priority(int priority) { + this.priority = priority; + return this; + } + public AsyncAdminRequestRetryingCaller build() { - return new AsyncAdminRequestRetryingCaller(retryTimer, conn, pauseNs, maxAttempts, - operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, + return new AsyncAdminRequestRetryingCaller(retryTimer, conn, priority, pauseNs, + maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt, checkNotNull(serverName, "serverName is null"), checkNotNull(callable, "action is null")); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java index 584bfacf76..96961afc65 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java @@ -91,6 +91,8 @@ class AsyncScanSingleRegionRpcRetryingCaller { private final boolean regionServerRemote; + private final int priority; + private final long scannerLeaseTimeoutPeriodNs; private final long pauseNs; @@ -298,11 +300,11 @@ class AsyncScanSingleRegionRpcRetryingCaller { } } - public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, - AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId, - ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub, - HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs, - long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { + public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, + Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache resultCache, + AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc, + boolean isRegionServerRemote, int priority, long scannerLeaseTimeoutPeriodNs, long pauseNs, + int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { this.retryTimer = retryTimer; this.scan = scan; this.scanMetrics = scanMetrics; @@ -324,7 +326,9 @@ class AsyncScanSingleRegionRpcRetryingCaller { completeWhenNoMoreResultsInRegion = this::completeWhenNoMoreResultsInRegion; } this.future = new CompletableFuture<>(); + this.priority = priority; this.controller = conn.rpcControllerFactory.newController(); + this.controller.setPriority(priority); this.exceptions = new ArrayList<>(); } @@ -338,7 +342,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void closeScanner() { incRPCCallsMetrics(scanMetrics, regionServerRemote); - resetController(controller, rpcTimeoutNs); + resetController(controller, rpcTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(this.scannerId, 0, true, false); stub.scan(controller, req, resp -> { if (controller.failed()) { @@ -558,7 +562,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { if (tries > 1) { incRPCRetriesMetrics(scanMetrics, regionServerRemote); } - resetController(controller, callTimeoutNs); + resetController(controller, callTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, scan.getCaching(), false, nextCallSeq, false, false, scan.getLimit()); stub.scan(controller, req, resp -> onComplete(controller, resp)); @@ -575,7 +579,7 @@ class AsyncScanSingleRegionRpcRetryingCaller { private void renewLease() { incRPCCallsMetrics(scanMetrics, regionServerRemote); nextCallSeq++; - resetController(controller, rpcTimeoutNs); + resetController(controller, rpcTimeoutNs, priority); ScanRequest req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq, false, true, -1); stub.scan(controller, req, resp -> { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java index f114eff5ba..63c85c210e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import java.io.IOException; import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.yetus.audience.InterfaceAudience; @@ -47,8 +48,8 @@ public class AsyncServerRequestRpcRetryingCaller extends AsyncRpcRetryingCall public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName, Callable callable) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, - startLogErrorsCnt); + super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts, operationTimeoutNs, + rpcTimeoutNs, startLogErrorsCnt); this.serverName = serverName; this.callable = callable; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java index 9490d0f0b6..9b0dede5ed 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java @@ -56,9 +56,9 @@ class AsyncSingleRequestRpcRetryingCaller extends AsyncRpcRetryingCaller { public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, TableName tableName, byte[] row, int replicaId, RegionLocateType locateType, - Callable callable, long pauseNs, int maxAttempts, long operationTimeoutNs, + Callable callable, int priority, long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) { - super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, + super(retryTimer, conn, priority, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt); this.tableName = tableName; this.row = row; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 8e050df93d..9a000bb82b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -298,12 +298,13 @@ public final class ConnectionUtils { return Bytes.equals(row, EMPTY_END_ROW); } - static void resetController(HBaseRpcController controller, long timeoutNs) { + static void resetController(HBaseRpcController controller, long timeoutNs, int priority) { controller.reset(); if (timeoutNs >= 0) { controller.setCallTimeout( (int) Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(timeoutNs))); } + controller.setPriority(priority); } static Throwable translateException(Throwable t) { @@ -572,4 +573,35 @@ public final class ConnectionUtils { }); return future; } + + /** + * Select the priority for the rpc call. + *

+ * The rules are: + *

    + *
  1. If user set a priority explicitly, then just use it.
  2. + *
  3. For meta table, use {@link HConstants#META_QOS}.
  4. + *
  5. For other system table, use {@link HConstants#SYSTEMTABLE_QOS}.
  6. + *
  7. For other tables, use {@link HConstants#NORMAL_QOS}.
  8. + *
+ * @param priority the priority set by user, can be {@link HConstants#PRIORITY_UNSET}. + * @param tableName the table we operate on + */ + static int calculatePriority(int priority, TableName tableName) { + if (priority != HConstants.PRIORITY_UNSET) { + return priority; + } else { + return getPriority(tableName); + } + } + + static int getPriority(TableName tableName) { + if (TableName.isMetaTableName(tableName)) { + return HConstants.META_QOS; + } else if (tableName.isSystemTable()) { + return HConstants.SYSTEMTABLE_QOS; + } else { + return HConstants.NORMAL_QOS; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d4b60fb8a3..fd7edd96cd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.util.FutureUtils.addListener; import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException; +import static org.apache.hadoop.hbase.HConstants.HIGH_QOS; import com.google.protobuf.Message; import com.google.protobuf.RpcChannel; @@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.regex.Pattern; @@ -393,7 +395,6 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture adminCall(HBaseRpcController controller, AdminService.Interface stub, PREQ preq, AdminRpcCall rpcCall, Converter respConverter) { - CompletableFuture future = new CompletableFuture<>(); rpcCall.call(stub, controller, preq, new RpcCallback() { @@ -416,9 +417,30 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture procedureCall(PREQ preq, MasterRpcCall rpcCall, Converter respConverter, ProcedureBiConsumer consumer) { - CompletableFuture procFuture = - this. newMasterCaller().action((controller, stub) -> this - . call(controller, stub, preq, rpcCall, respConverter)).call(); + return procedureCall(b -> { + }, preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall(TableName tableName, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + return procedureCall(b -> b.priority(tableName), preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall(byte[] regionName, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + return procedureCall(b -> b.priority(regionName), preq, rpcCall, respConverter, consumer); + } + + private CompletableFuture procedureCall( + Consumer> prioritySetter, PREQ preq, + MasterRpcCall rpcCall, Converter respConverter, + ProcedureBiConsumer consumer) { + MasterRequestCallerBuilder builder = this. newMasterCaller().action((controller, + stub) -> this. call(controller, stub, preq, rpcCall, respConverter)); + prioritySetter.accept(builder); + CompletableFuture procFuture = builder.call(); CompletableFuture future = waitProcedureResult(procFuture); addListener(future, consumer); return future; @@ -515,7 +537,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture getDescriptor(TableName tableName) { CompletableFuture future = new CompletableFuture<>(); - addListener(this.> newMasterCaller() + addListener(this.> newMasterCaller().priority(tableName) .action((controller, stub) -> this .> call( controller, stub, RequestConverter.buildGetTableDescriptorsRequest(tableName), @@ -566,14 +588,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { private CompletableFuture createTable(TableName tableName, CreateTableRequest request) { Preconditions.checkNotNull(tableName, "table name is null"); - return this. procedureCall(request, + return this. procedureCall(tableName, request, (s, c, req, done) -> s.createTable(c, req, done), (resp) -> resp.getProcId(), new CreateTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture modifyTable(TableDescriptor desc) { - return this. procedureCall( + return this. procedureCall(desc.getTableName(), RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); @@ -581,15 +603,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture deleteTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.deleteTable(c, req, done), (resp) -> resp.getProcId(), new DeleteTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture truncateTable(TableName tableName, boolean preserveSplits) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildTruncateTableRequest(tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.truncateTable(c, req, done), (resp) -> resp.getProcId(), new TruncateTableProcedureBiConsumer(tableName)); @@ -597,16 +619,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture enableTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.enableTable(c, req, done), (resp) -> resp.getProcId(), new EnableTableProcedureBiConsumer(tableName)); } @Override public CompletableFuture disableTable(TableName tableName) { - return this. procedureCall(RequestConverter - .buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), + return this. procedureCall(tableName, + RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.disableTable(c, req, done), (resp) -> resp.getProcId(), new DisableTableProcedureBiConsumer(tableName)); } @@ -725,7 +747,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture addColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildAddColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.addColumn(c, req, done), (resp) -> resp.getProcId(), new AddColumnFamilyProcedureBiConsumer(tableName)); @@ -733,7 +755,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture deleteColumnFamily(TableName tableName, byte[] columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildDeleteColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.deleteColumn(c, req, done), (resp) -> resp.getProcId(), new DeleteColumnFamilyProcedureBiConsumer(tableName)); @@ -742,7 +764,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture modifyColumnFamily(TableName tableName, ColumnFamilyDescriptor columnFamily) { - return this. procedureCall( + return this. procedureCall(tableName, RequestConverter.buildModifyColumnRequest(tableName, columnFamily, ng.getNonceGroup(), ng.newNonce()), (s, c, req, done) -> s.modifyColumn(c, req, done), (resp) -> resp.getProcId(), new ModifyColumnFamilyProcedureBiConsumer(tableName)); @@ -1226,9 +1248,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { } addListener( - this. procedureCall(request, - (s, c, req, done) -> s.mergeTableRegions(c, req, done), (resp) -> resp.getProcId(), - new MergeTableRegionProcedureBiConsumer(tableName)), + this. procedureCall(nameOfRegionA, + request, (s, c, req, done) -> s.mergeTableRegions(c, req, done), + (resp) -> resp.getProcId(), new MergeTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); @@ -1403,9 +1425,11 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - addListener(this. procedureCall(request, - (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), - new SplitTableRegionProcedureBiConsumer(tableName)), (ret, err2) -> { + addListener( + this. procedureCall(hri.getTable(), + request, (s, c, req, done) -> s.splitRegion(c, req, done), (resp) -> resp.getProcId(), + new SplitTableRegionProcedureBiConsumer(tableName)), + (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); } else { @@ -1423,7 +1447,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(err); return; } - addListener(this. newMasterCaller() + addListener(this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this. call( controller, stub, RequestConverter.buildAssignRegionRequest(regionInfo.getRegionName()), (s, c, req, done) -> s.assignRegion(c, req, done), resp -> null))) @@ -1447,7 +1471,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - this. newMasterCaller() + this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this . call(controller, stub, RequestConverter.buildUnassignRegionRequest(regionInfo.getRegionName(), forcible), @@ -1473,7 +1497,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - this. newMasterCaller() + this. newMasterCaller().priority(regionName) .action(((controller, stub) -> this . call(controller, stub, RequestConverter.buildOfflineRegionRequest(regionInfo.getRegionName()), @@ -1499,7 +1523,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return; } addListener( - moveRegion( + moveRegion(regionName, RequestConverter.buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), null)), (ret, err2) -> { if (err2 != null) { @@ -1522,8 +1546,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { future.completeExceptionally(err); return; } - addListener(moveRegion(RequestConverter - .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), + addListener( + moveRegion(regionName, RequestConverter + .buildMoveRegionRequest(regionInfo.getEncodedNameAsBytes(), destServerName)), (ret, err2) -> { if (err2 != null) { future.completeExceptionally(err2); @@ -1535,12 +1560,12 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { return future; } - private CompletableFuture moveRegion(MoveRegionRequest request) { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)).call(); + private CompletableFuture moveRegion(byte[] regionName, MoveRegionRequest request) { + return this. newMasterCaller().priority(regionName) + .action( + (controller, stub) -> this. call(controller, + stub, request, (s, c, req, done) -> s.moveRegion(c, req, done), resp -> null)) + .call(); } @Override @@ -2704,35 +2729,31 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture shutdown() { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, ShutdownRequest.newBuilder().build(), - (s, c, req, done) -> s.shutdown(c, req, done), resp -> null)).call(); + return this. newMasterCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. call(controller, + stub, ShutdownRequest.newBuilder().build(), (s, c, req, done) -> s.shutdown(c, req, done), + resp -> null)) + .call(); } @Override public CompletableFuture stopMaster() { - return this - . newMasterCaller() - .action( - (controller, stub) -> this. call(controller, - stub, StopMasterRequest.newBuilder().build(), - (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)).call(); + return this. newMasterCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. call( + controller, stub, StopMasterRequest.newBuilder().build(), + (s, c, req, done) -> s.stopMaster(c, req, done), resp -> null)) + .call(); } @Override public CompletableFuture stopRegionServer(ServerName serverName) { - StopServerRequest request = - RequestConverter.buildStopServerRequest("Called by admin client " - + this.connection.toString()); - return this - . newAdminCaller() - .action( - (controller, stub) -> this. adminCall( - controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), - resp -> null)).serverName(serverName).call(); + StopServerRequest request = RequestConverter + .buildStopServerRequest("Called by admin client " + this.connection.toString()); + return this. newAdminCaller().priority(HIGH_QOS) + .action((controller, stub) -> this. adminCall( + controller, stub, request, (s, c, req, done) -> s.stopServer(controller, req, done), + resp -> null)) + .serverName(serverName).call(); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java index 7562e6fd6b..f5db8c1d0c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java @@ -205,7 +205,7 @@ class RawAsyncTableImpl implements AsyncTable { (info, src) -> reqConvert.convert(info, src, nonceGroup, nonce), respConverter); } - private SingleRequestCallerBuilder newCaller(byte[] row, long rpcTimeoutNs) { + private SingleRequestCallerBuilder newCaller(byte[] row, int priority, long rpcTimeoutNs) { return conn.callerFactory. single().table(tableName).row(row) .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS) .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS) @@ -213,12 +213,13 @@ class RawAsyncTableImpl implements AsyncTable { .startLogErrorsCnt(startLogErrorsCnt); } - private SingleRequestCallerBuilder newCaller(Row row, long rpcTimeoutNs) { - return newCaller(row.getRow(), rpcTimeoutNs); + private SingleRequestCallerBuilder newCaller( + R row, long rpcTimeoutNs) { + return newCaller(row.getRow(), row.getPriority(), rpcTimeoutNs); } private CompletableFuture get(Get get, int replicaId) { - return this. newCaller(get, readRpcTimeoutNs) + return this. newCaller(get, readRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl . call(controller, loc, stub, get, RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done), @@ -235,7 +236,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture put(Put put) { - return this. newCaller(put, writeRpcTimeoutNs) + return this. newCaller(put, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, put, RequestConverter::buildMutateRequest)) .call(); @@ -243,7 +244,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture delete(Delete delete) { - return this. newCaller(delete, writeRpcTimeoutNs) + return this. newCaller(delete, writeRpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. voidMutate(controller, loc, stub, delete, RequestConverter::buildMutateRequest)) .call(); @@ -254,7 +255,7 @@ class RawAsyncTableImpl implements AsyncTable { checkHasFamilies(append); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(append, rpcTimeoutNs) + return this. newCaller(append, rpcTimeoutNs) .action( (controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, loc, stub, append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) @@ -266,7 +267,7 @@ class RawAsyncTableImpl implements AsyncTable { checkHasFamilies(increment); long nonceGroup = conn.getNonceGenerator().getNonceGroup(); long nonce = conn.getNonceGenerator().newNonce(); - return this. newCaller(increment, rpcTimeoutNs) + return this. newCaller(increment, rpcTimeoutNs) .action((controller, loc, stub) -> this. noncedMutate(nonceGroup, nonce, controller, loc, stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult)) @@ -327,7 +328,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenPut(Put put) { preCheck(); - return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + return RawAsyncTableImpl.this. newCaller(row, put.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, put, (rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -339,7 +340,7 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenDelete(Delete delete) { preCheck(); - return RawAsyncTableImpl.this. newCaller(row, rpcTimeoutNs) + return RawAsyncTableImpl.this. newCaller(row, delete.getPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutate(controller, loc, stub, delete, (rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -351,7 +352,8 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture thenMutate(RowMutations mutation) { preCheck(); - return RawAsyncTableImpl.this. newCaller(mutation, rpcTimeoutNs) + return RawAsyncTableImpl.this + . newCaller(row, mutation.getMaxPriority(), rpcTimeoutNs) .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier, @@ -409,8 +411,9 @@ class RawAsyncTableImpl implements AsyncTable { @Override public CompletableFuture mutateRow(RowMutations mutation) { - return this. newCaller(mutation, writeRpcTimeoutNs).action((controller, loc, - stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, mutation, (rn, rm) -> { + return this. newCaller(mutation.getRow(), mutation.getMaxPriority(), writeRpcTimeoutNs) + .action((controller, loc, stub) -> RawAsyncTableImpl. mutateRow(controller, loc, stub, + mutation, (rn, rm) -> { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm); regionMutationBuilder.setAtomic(true); return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build(); -- 2.17.1