From 46c028c6680662190cf7c9e80f9db3ec66acdec8 Mon Sep 17 00:00:00 2001 From: meiyi Date: Fri, 7 Dec 2018 16:06:29 +0800 Subject: [PATCH] HBASE-21159 Add shell command to switch throttle on or off --- .../java/org/apache/hadoop/hbase/client/Admin.java | 7 + .../org/apache/hadoop/hbase/client/AsyncAdmin.java | 7 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 5 + .../hbase/client/ConnectionImplementation.java | 6 + .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 13 ++ .../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 14 ++ .../hbase/client/ShortCircuitMasterConnection.java | 8 ++ .../apache/hadoop/hbase/quotas/QuotaTableUtil.java | 7 +- .../src/main/protobuf/Master.proto | 10 ++ .../src/main/protobuf/MasterProcedure.proto | 14 ++ .../hadoop/hbase/coprocessor/MasterObserver.java | 18 +++ .../apache/hadoop/hbase/executor/EventType.java | 6 + .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../hadoop/hbase/master/MasterCoprocessorHost.java | 19 +++ .../hadoop/hbase/master/MasterRpcServices.java | 13 ++ .../procedure/RefreshRpcThrottleProcedure.java | 155 +++++++++++++++++++++ .../master/procedure/ServerProcedureInterface.java | 2 +- .../hadoop/hbase/master/procedure/ServerQueue.java | 2 + .../procedure/SwitchRpcThrottleProcedure.java | 151 ++++++++++++++++++++ .../hadoop/hbase/quotas/MasterQuotaManager.java | 25 ++++ .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 18 +++ .../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 18 +++ .../hbase/quotas/RegionServerRpcQuotaManager.java | 35 ++++- .../hadoop/hbase/regionserver/HRegionServer.java | 2 + .../regionserver/RefreshRpcThrottleCallable.java | 47 +++++++ .../hbase/security/access/AccessController.java | 6 + .../hbase/client/TestAsyncQuotaAdminApi.java | 7 + .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java | 46 ++++++ .../security/access/TestAccessController.java | 13 ++ hbase-shell/src/main/ruby/hbase/quotas.rb | 4 + hbase-shell/src/main/ruby/shell.rb | 2 + .../ruby/shell/commands/disable_rpc_throttle.rb | 40 ++++++ .../ruby/shell/commands/enable_rpc_throttle.rb | 40 ++++++ hbase-shell/src/test/ruby/hbase/quotas_test.rb | 8 ++ 34 files changed, 766 insertions(+), 5 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshRpcThrottleProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshRpcThrottleCallable.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index 08b44c9..3b2e8297 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2796,4 +2796,11 @@ public interface Admin extends Abortable, Closeable { */ void cloneTableSchema(final TableName tableName, final TableName newTableName, final boolean preserveSplits) throws IOException; + + /** + * Turn the rpc throttle on or off. + * @param on Set to true to enable, false to disable. + * @return Previous quota throttle value + */ + boolean switchRpcThrottle(final boolean on) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index 6bb253a..24a7be7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -1287,4 +1287,11 @@ public interface AsyncAdmin { */ CompletableFuture> compactionSwitch(boolean switchState, List serverNamesList); + + /** + * Turn the quota throttle on or off. + * @param on Set to true to enable, false to disable. + * @return Previous quota throttle value + */ + CompletableFuture switchRpcThrottle(boolean on); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 39eda07..94320cb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -765,4 +765,9 @@ class AsyncHBaseAdmin implements AsyncAdmin { List serverNamesList) { return wrap(rawAdmin.compactionSwitch(switchState, serverNamesList)); } + + @Override + public CompletableFuture switchRpcThrottle(boolean on) { + return wrap(rawAdmin.switchRpcThrottle(on)); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index da6b592..6ceec8b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1757,6 +1757,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { return stub.transitReplicationPeerSyncReplicationState(controller, request); } + + @Override + public MasterProtos.SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + MasterProtos.SwitchRpcThrottleRequest request) throws ServiceException { + return stub.switchRpcThrottle(controller, request); + } }; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 45961ff..e219944 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4341,4 +4341,17 @@ public class HBaseAdmin implements Admin { createTable(htd); } } + + @Override + public boolean switchRpcThrottle(final boolean throttleOn) throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return this.master + .switchRpcThrottle(getRpcController(), + MasterProtos.SwitchRpcThrottleRequest.newBuilder().setThrottleOn(throttleOn).build()) + .getPreviousThrottleOn(); + } + }); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 4f73909..bfe081b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -243,6 +243,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -3635,4 +3637,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { resp -> ProtobufUtil.toCacheEvictionStats(resp.getStats()))) .serverName(serverName).call(); } + + @Override + public CompletableFuture switchRpcThrottle(boolean throttleOn) { + CompletableFuture future = this. newMasterCaller() + .action((controller, stub) -> this + . call(controller, stub, + SwitchRpcThrottleRequest.newBuilder().setThrottleOn(throttleOn).build(), + (s, c, req, done) -> s.switchRpcThrottle(c, req, done), + resp -> resp.getPreviousThrottleOn())) + .call(); + return future; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 7bb65d2..166a931 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -146,6 +146,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -647,4 +649,10 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { TransitReplicationPeerSyncReplicationStateRequest request) throws ServiceException { return stub.transitReplicationPeerSyncReplicationState(controller, request); } + + @Override + public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + SwitchRpcThrottleRequest request) throws ServiceException { + return stub.switchRpcThrottle(controller, request); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java index 419091d..07c140b7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaTableUtil.java @@ -57,7 +57,6 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.protobuf.ProtobufMagic; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -108,6 +107,8 @@ public class QuotaTableUtil { protected static final byte[] QUOTA_USER_ROW_KEY_PREFIX = Bytes.toBytes("u."); protected static final byte[] QUOTA_TABLE_ROW_KEY_PREFIX = Bytes.toBytes("t."); protected static final byte[] QUOTA_NAMESPACE_ROW_KEY_PREFIX = Bytes.toBytes("n."); + protected static final byte[] RPC_THROTTLE_ENABLED_ROW_KEY = + Bytes.toBytes("rpc_throttle_enabled"); /* ========================================================================= * Quota "settings" helpers @@ -340,6 +341,10 @@ public class QuotaTableUtil { parseTableResult(result, visitor); } else if (isUserRowKey(row)) { parseUserResult(result, visitor); + } else if (Bytes.equals(row, RPC_THROTTLE_ENABLED_ROW_KEY)) { + if (LOG.isDebugEnabled()) { + LOG.debug("ignore rpc_throttle_enabled row-key"); + } } else { LOG.warn("unexpected row-key: " + Bytes.toString(row)); } diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 7f6513c..6cfe023 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -635,6 +635,14 @@ message ClearDeadServersResponse { repeated ServerName server_name = 1; } +message SwitchRpcThrottleRequest { + required bool throttle_on = 1; +} + +message SwitchRpcThrottleResponse { + required bool previous_throttle_on = 1; +} + service MasterService { /** Used by the client to get the number of regions that have received the updated schema */ rpc GetSchemaAlterStatus(GetSchemaAlterStatusRequest) @@ -992,6 +1000,8 @@ service MasterService { rpc ClearDeadServers(ClearDeadServersRequest) returns(ClearDeadServersResponse); + /** Turn the quota throttle on or off */ + rpc SwitchRpcThrottle (SwitchRpcThrottleRequest) returns (SwitchRpcThrottleResponse); } // HBCK Service definitions. diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index cc0c6ba..9fceff5 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -551,3 +551,17 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +enum SwitchRpcThrottleState { + SWITCH_RPC_THROTTLE_UPDATE_QUOTA_TABLE = 1; + SWITCH_RPC_THROTTLE_REFRESH_RS = 2; + SWITCH_RPC_THROTTLE_POST = 3; +} + +message SwitchRpcThrottleStateData { + required bool throttle_on = 1; +} + +message RefreshRpcThrottleParameter { + required ServerName target_server = 1; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java index a0863e4..2a8ae17 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MasterObserver.java @@ -1517,4 +1517,22 @@ public interface MasterObserver { */ default void postRecommissionRegionServer(ObserverContext ctx, ServerName server, List encodedRegionNames) throws IOException {} + + /** + * Called before switching rpc throttle on or off. + * @param ctx the coprocessor instance's environment + */ + default void preSwitchRpcThrottle(final ObserverContext ctx, + final boolean newValue) throws IOException { + } + + /** + * Called after switching rpc throttle on or off. + * @param ctx the coprocessor instance's environment + * @param oldValue the previously rpc throttle value + * @param newValue the newly rpc throttle value + */ + default void postSwitchRpcThrottle(final ObserverContext ctx, + final boolean oldValue, final boolean newValue) throws IOException { + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1c..7a48140 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -140,6 +140,12 @@ public enum EventType { * Master asking RS to open a priority region. */ M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION), + /** + * Messages originating from Master to RS.
+ * M_RS_REFRESH_RPC_THROTTLE
+ * Master asking RS to switch rpc throttle state. + */ + M_RS_REFRESH_RPC_THROTTLE(27, ExecutorType.RS_REFRESH_RPC_THROTTLE), /** * Messages originating from Client to Master.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354..41f7add 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_REFRESH_RPC_THROTTLE(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java index 51e30c4..7ea72f9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterCoprocessorHost.java @@ -1771,4 +1771,23 @@ public class MasterCoprocessorHost } }); } + + public void preSwitchRpcThrottle(boolean throttleOn) throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null :new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.preSwitchRpcThrottle(this, throttleOn); + } + }); + } + + public void postSwitchRpcThrottle(final boolean oldValue, final boolean newValue) + throws IOException { + execOperation(coprocEnvironments.isEmpty() ? null : new MasterObserverOperation() { + @Override + public void call(MasterObserver observer) throws IOException { + observer.postSwitchRpcThrottle(this, oldValue, newValue); + } + }); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index cd838d5..b9c0ec0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -258,6 +258,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -2478,6 +2480,17 @@ public class MasterRpcServices extends RSRpcServices } } + @Override + public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller, + SwitchRpcThrottleRequest request) throws ServiceException { + try { + master.checkInitialized(); + return master.getMasterQuotaManager().switchRpcThrottle(request); + } catch (Exception e) { + throw new ServiceException(e); + } + } + private boolean containMetaWals(ServerName serverName) throws IOException { Path logDir = new Path(master.getWALRootDir(), AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshRpcThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshRpcThrottleProcedure.java new file mode 100644 index 0000000..da04e35 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshRpcThrottleProcedure.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.RefreshRpcThrottleCallable; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshRpcThrottleParameter; + +@InterfaceAudience.Private +public class RefreshRpcThrottleProcedure extends Procedure + implements RemoteProcedure, ServerProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(RefreshRpcThrottleProcedure.class); + private ServerName targetServer; + + public RefreshRpcThrottleProcedure() { + } + + public RefreshRpcThrottleProcedure(ServerName serverName) { + this.targetServer = serverName; + } + + private boolean dispatched; + private ProcedureEvent event; + private boolean succ; + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException frde) { + LOG.info("Can not add remote operation for refreshing throttle on {}, ", targetServer, frde); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + RefreshRpcThrottleParameter.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build(); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + RefreshRpcThrottleParameter data = serializer.deserialize(RefreshRpcThrottleParameter.class); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation + remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) { + assert targetServer.equals(remote); + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), + RefreshRpcThrottleCallable.class, RefreshRpcThrottleParameter.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.REFRESH_RPC_THROTTLE_STATE; + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (error != null) { + this.succ = false; + } else { + this.succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" server="); + sb.append(targetServer); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index f3c10ef..76bd30e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER + CRASH_HANDLER, REFRESH_RPC_THROTTLE_STATE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 3a1b3c4..9e8420b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -35,6 +35,8 @@ class ServerQueue extends Queue { switch (spi.getServerOperationType()) { case CRASH_HANDLER: return true; + case REFRESH_RPC_THROTTLE_STATE: + return false; default: break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java new file mode 100644 index 0000000..84fe9aa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchRpcThrottleProcedure.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.quotas.QuotaUtil; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchRpcThrottleStateData; + +@InterfaceAudience.Private +public class SwitchRpcThrottleProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private static Logger LOG = LoggerFactory.getLogger(SwitchRpcThrottleProcedure.class); + + boolean throttleOn; + ProcedurePrepareLatch syncLatch; + ServerName serverName; + + public SwitchRpcThrottleProcedure() { + } + + public SwitchRpcThrottleProcedure(boolean throttleOn, ServerName serverName, + final ProcedurePrepareLatch syncLatch) { + this.syncLatch = syncLatch; + this.throttleOn = throttleOn; + this.serverName = serverName; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, SwitchRpcThrottleState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + try { + switch (state) { + case SWITCH_RPC_THROTTLE_UPDATE_QUOTA_TABLE: + setThrottleState(env, throttleOn); + setNextState(SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_REFRESH_RS); + return Flow.HAS_MORE_STATE; + case SWITCH_RPC_THROTTLE_REFRESH_RS: + RefreshRpcThrottleProcedure[] subProcedures = env.getMasterServices() + .getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshRpcThrottleProcedure(sn)) + .toArray(RefreshRpcThrottleProcedure[]::new); + addChildProcedure(subProcedures); + setNextState(SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_POST); + return Flow.HAS_MORE_STATE; + case SWITCH_RPC_THROTTLE_POST: + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } catch (IOException e) { + LOG.warn("Retriable error trying to switch rpc throttle state={} (in state={})", throttleOn, + state, e); + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + } + return Flow.HAS_MORE_STATE; + } + + @Override + protected void rollbackState(MasterProcedureEnv env, SwitchRpcThrottleState state) + throws IOException, InterruptedException { + } + + @Override + protected SwitchRpcThrottleState getState(int stateId) { + return SwitchRpcThrottleState.forNumber(stateId); + } + + @Override + protected int getStateId(SwitchRpcThrottleState throttleState) { + return throttleState.getNumber(); + } + + @Override + protected SwitchRpcThrottleState getInitialState() { + return SwitchRpcThrottleState.SWITCH_RPC_THROTTLE_UPDATE_QUOTA_TABLE; + } + + @Override + protected SwitchRpcThrottleState getCurrentState() { + return super.getCurrentState(); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(SwitchRpcThrottleStateData.newBuilder().setThrottleOn(throttleOn).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + SwitchRpcThrottleStateData data = serializer.deserialize(SwitchRpcThrottleStateData.class); + throttleOn = data.getThrottleOn(); + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.REFRESH_RPC_THROTTLE_STATE; + } + + public void setThrottleState(MasterProcedureEnv env, boolean throttleOn) throws IOException{ + QuotaUtil.setRpcThrottleState(env.getMasterServices().getConnection(), throttleOn); + } + + @Override + public void toStringClassDetails(StringBuilder sb) { + sb.append(getClass().getSimpleName()); + sb.append(" server="); + sb.append(serverName); + sb.append(", rpcThrottleOn="); + sb.append(throttleOn); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index bdeab80..8a3492a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.master.procedure.SwitchRpcThrottleProcedure; import org.apache.hadoop.hbase.namespace.NamespaceAuditor; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; @@ -54,6 +56,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchRpcThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; @@ -310,6 +314,27 @@ public class MasterQuotaManager implements RegionStateListener { } } + public SwitchRpcThrottleResponse switchRpcThrottle(SwitchRpcThrottleRequest request) + throws IOException { + if (initialized) { + boolean throttleOn = request.getThrottleOn(); + masterServices.getMasterCoprocessorHost().preSwitchRpcThrottle(throttleOn); + boolean oldStatus = QuotaUtil.getRpcThrottleState(masterServices.getConnection()); + + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); + SwitchRpcThrottleProcedure procedure = + new SwitchRpcThrottleProcedure(throttleOn, masterServices.getServerName(), latch); + masterServices.getMasterProcedureExecutor().submitProcedure(procedure); + latch.await(); + SwitchRpcThrottleResponse response = + SwitchRpcThrottleResponse.newBuilder().setPreviousThrottleOn(oldStatus).build(); + masterServices.getMasterCoprocessorHost().postSwitchRpcThrottle(oldStatus, throttleOn); + return response; + } else { + return SwitchRpcThrottleResponse.newBuilder().setPreviousThrottleOn(false).build(); + } + } + private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) throws IOException, InterruptedException { if (req.hasRemoveAll() && req.getRemoveAll() == true) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 0664cc5..3ae32bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -69,6 +69,7 @@ public class QuotaCache implements Stoppable { private final ConcurrentHashMap namespaceQuotaCache = new ConcurrentHashMap<>(); private final ConcurrentHashMap tableQuotaCache = new ConcurrentHashMap<>(); private final ConcurrentHashMap userQuotaCache = new ConcurrentHashMap<>(); + private boolean rpcThrottleOnUseful = false; private final RegionServerServices rsServices; private QuotaRefresherChore refreshChore; @@ -180,6 +181,11 @@ public class QuotaCache implements Stoppable { return userQuotaCache; } + @VisibleForTesting + boolean isRpcThrottleOnUseful() { + return rpcThrottleOnUseful; + } + // TODO: Remove this once we have the notification bus private class QuotaRefresherChore extends ScheduledChore { private long lastUpdate = 0; @@ -204,6 +210,7 @@ public class QuotaCache implements Stoppable { } } + fetchRpcThrottleOn(); fetchNamespaceQuotaState(); fetchTableQuotaState(); fetchUserQuotaState(); @@ -256,6 +263,17 @@ public class QuotaCache implements Stoppable { } }); } + + private void fetchRpcThrottleOn() { + if (!rpcThrottleOnUseful && tableQuotaCache.size() > 0) { + try { + QuotaUtil.getRpcThrottleState(rsServices.getConnection()); + rpcThrottleOnUseful = true; + } catch (IOException e) { + rpcThrottleOnUseful = false; + } + } + } private void fetch(final String type, final ConcurrentHashMap quotasMap, final Fetcher fetcher) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index f6b5d95..affb0a6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -90,6 +90,24 @@ public class QuotaUtil extends QuotaTableUtil { /* ========================================================================= * Quota "settings" helpers */ + public static void setRpcThrottleState(final Connection connection, final boolean isThrottleOn) + throws IOException { + Put put = new Put(RPC_THROTTLE_ENABLED_ROW_KEY); + put.addColumn(QUOTA_FAMILY_INFO, null, Bytes.toBytes(isThrottleOn)); + doPut(connection, put); + } + + public static boolean getRpcThrottleState(final Connection connection) throws IOException { + Get get = new Get(RPC_THROTTLE_ENABLED_ROW_KEY); + get.addColumn(QUOTA_FAMILY_INFO, null); + Result result = doGet(connection, get); + if (result != null && result.value() != null) { + return Bytes.toBoolean(result.value()); + } + LOG.info("Unable to get rpc_throttle_enabled from quota table and return true by default"); + return true; + } + public static void addTableQuota(final Connection connection, final TableName table, final Quotas data) throws IOException { addQuotas(connection, getTableRowKey(table), data); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 40e70dc..412550a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -52,6 +52,10 @@ public class RegionServerRpcQuotaManager { private final RegionServerServices rsServices; private QuotaCache quotaCache = null; + private boolean isRpcThrottleOn = true; + // the flag used to show if isRpcThrottleOn is get from quota table(useful) + // or set true by default(unuseful). If not useful, get from quota table next time. + private boolean isRpcThrottleOnUseful = false; public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { this.rsServices = rsServices; @@ -76,11 +80,38 @@ public class RegionServerRpcQuotaManager { } } - public boolean isQuotaEnabled() { + private boolean isQuotaEnabled() { return quotaCache != null; } @VisibleForTesting + protected boolean isRpcThrottleOn() { + if (!isRpcThrottleOnUseful && quotaCache.isRpcThrottleOnUseful()) { + innerRefreshThrottleState(); + } + return isRpcThrottleOn; + } + + private void innerRefreshThrottleState() { + try { + refreshThrottleState(); + } catch (Exception e) { + LOG.error("Failed to get rpc throttle state", e); + isRpcThrottleOnUseful = false; + } + } + + public void refreshThrottleState() throws Exception { + if (isQuotaEnabled()) { + boolean previousRpcThrottleOn = isRpcThrottleOn; + isRpcThrottleOn = QuotaUtil.getRpcThrottleState(rsServices.getConnection()); + LOG.info("Refresh rpc throttle state, previous state is {} and current state is {}", + previousRpcThrottleOn, isRpcThrottleOn); + isRpcThrottleOnUseful = true; + } + } + + @VisibleForTesting QuotaCache getQuotaCache() { return quotaCache; } @@ -93,7 +124,7 @@ public class RegionServerRpcQuotaManager { * @return the OperationQuota */ public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) { - if (isQuotaEnabled() && !table.isSystemTable()) { + if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleOn()) { UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); boolean useNoop = userLimiter.isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 6242d36..2ce283a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1960,6 +1960,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1)); + this.executorService.startExecutorService(ExecutorType.RS_REFRESH_RPC_THROTTLE, + conf.getInt("hbase.regionserver.executor.refresh.rpc.peer.threads", 1)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshRpcThrottleCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshRpcThrottleCallable.java new file mode 100644 index 0000000..9ac2b7a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshRpcThrottleCallable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The callable executed at RS side to refresh rpc throttle state.
+ */ +@InterfaceAudience.Private +public class RefreshRpcThrottleCallable implements RSProcedureCallable { + private HRegionServer rs; + + @Override + public Void call() throws Exception { + rs.getRegionServerRpcQuotaManager().refreshThrottleState(); + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + } + + @Override + public EventType getEventType() { + return EventType.M_RS_REFRESH_RPC_THROTTLE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index 835fc0d..d9606d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -2581,6 +2581,12 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, checkSystemOrSuperUser(getActiveUser(ctx)); } + @Override + public void preSwitchRpcThrottle(ObserverContext ctx, + boolean newValue) throws IOException { + requirePermission(ctx, "switchRpcThrottle", Action.ADMIN); + } + /** * Returns the active user to which authorization checks should be applied. * If we are in the context of an RPC call, the remote user is used, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java index fc8a0ca..3b03398 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncQuotaAdminApi.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; @@ -178,6 +179,12 @@ public class TestAsyncQuotaAdminApi extends TestAsyncAdminBase { assertNumResults(0, null); } + @Test + public void testSwitchRpcThrottle() throws Exception { + CompletableFuture future = ASYNC_CONN.getAdmin().switchRpcThrottle(true); + assertEquals(true, future.get().booleanValue()); + } + private void assertNumResults(int expected, final QuotaFilter filter) throws Exception { assertEquals(expected, countResults(filter)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index 03e0aa5..6e621c2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -47,8 +48,10 @@ import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -520,6 +523,49 @@ public class TestQuotaAdmin { } + @Test + public void testRpcThrottleStateWhenStartup() throws IOException, InterruptedException { + Admin admin = TEST_UTIL.getAdmin(); + // at least one non system table to trigger refresh rpc throttle state + TableName testTable = TableName.valueOf("testRpcThrottleStateWhenStartup"); + TEST_UTIL.createTable(testTable, Bytes.toBytes("A")); + admin.switchRpcThrottle(false); + + TEST_UTIL.killMiniHBaseCluster(); + TEST_UTIL.startMiniHBaseCluster(); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.waitTableAvailable(testTable); + for (JVMClusterUtil.RegionServerThread rs : TEST_UTIL.getHBaseCluster() + .getRegionServerThreads()) { + RegionServerRpcQuotaManager quotaManager = + rs.getRegionServer().getRegionServerRpcQuotaManager(); + quotaManager.getQuotaCache().triggerCacheRefresh(); + Thread.sleep(1000); + assertTrue(quotaManager.getQuotaCache().isRpcThrottleOnUseful()); + assertFalse(quotaManager.isRpcThrottleOn()); + } + // enable rpc throttle + TEST_UTIL.getAdmin().switchRpcThrottle(true); + } + + @Test + public void testSwitchRpcThrottle() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + testSwitchRpcThrottle(admin, true, true); + testSwitchRpcThrottle(admin, true, false); + testSwitchRpcThrottle(admin, false, false); + testSwitchRpcThrottle(admin, false, true); + } + + private void testSwitchRpcThrottle(Admin admin, boolean oldThrottleState, + boolean currentThrottleState) throws IOException { + boolean state = admin.switchRpcThrottle(currentThrottleState); + Assert.assertEquals(oldThrottleState, state); + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .forEach(rs -> Assert.assertEquals(currentThrottleState, + rs.getRegionServer().getRegionServerRpcQuotaManager().isRpcThrottleOn())); + } + private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu) throws Exception { // Verify the RPC Quotas in the table diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 1b70054..10c71a6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -3418,6 +3418,19 @@ public class TestAccessController extends SecureTestUtil { } } + @Test + public void testSwitchRpcThrottle() throws Exception { + AccessTestAction action = new AccessTestAction() { + @Override + public Object run() throws Exception { + ACCESS_CONTROLLER.preSwitchRpcThrottle(ObserverContextImpl.createAndPrepare(CP_ENV), true); + return null; + } + }; + verifyAllowed(action, SUPERUSER, USER_ADMIN); + verifyDenied(action, USER_CREATE, USER_RW, USER_RO, USER_NONE, USER_OWNER); + } + /* * Validate Global User ACL */ diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb index 1ba9594..7ffb10b 100644 --- a/hbase-shell/src/main/ruby/hbase/quotas.rb +++ b/hbase-shell/src/main/ruby/hbase/quotas.rb @@ -243,6 +243,10 @@ module Hbase QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection) end + def switch_rpc_throttle(enableDisable) + @admin.switchRpcThrottle(java.lang.Boolean.valueOf(enableDisable)) + end + def _parse_size(str_limit) str_limit = str_limit.downcase match = /(\d+)([bkmgtp%]*)/.match(str_limit) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 1507ca3..1338d58 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -433,6 +433,8 @@ Shell.load_command_group( list_quota_table_sizes list_quota_snapshots list_snapshot_sizes + enable_rpc_throttle + disable_rpc_throttle ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb new file mode 100644 index 0000000..ff0ebe0 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/disable_rpc_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DisableRpcThrottle < Command + def help + return <<-EOF +Turn quota throttle off. Returns previous throttle state. +NOTE: if quota is not enabled, this will not work. + +Examples: + hbase> disable_rpc_throttle +EOF + end + + def command() + prev_state = quotas_admin.switch_rpc_throttle(false) ? 'true' : 'false' + formatter.row(["Previous rpc throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb new file mode 100644 index 0000000..9c74938 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/enable_rpc_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class EnableRpcThrottle < Command + def help + return <<-EOF +Turn quota throttle on. Returns previous throttle state. +NOTE: if quota is not enabled, this will not work. + +Examples: + hbase> enable_rpc_throttle +EOF + end + + def command() + prev_state = quotas_admin.switch_rpc_throttle(true) ? 'true' : 'false' + formatter.row(["Previous rpc throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/test/ruby/hbase/quotas_test.rb b/hbase-shell/src/test/ruby/hbase/quotas_test.rb index be6b238..afca760 100644 --- a/hbase-shell/src/test/ruby/hbase/quotas_test.rb +++ b/hbase-shell/src/test/ruby/hbase/quotas_test.rb @@ -163,5 +163,13 @@ module Hbase output = capture_stdout{ command(:list_quotas) } assert(output.include?('0 row(s)')) end + + define_test 'switch rpc throttle' do + output = capture_stdout{command(:disable_rpc_throttle)} + assert(output.include?('Previous rpc throttle state : true')) + + output = capture_stdout{command(:enable_rpc_throttle)} + assert(output.include?('Previous rpc throttle state : false')) + end end end -- 2.7.4