From d2131b1ffb7e05285dedc4f3b7bf36f6d633d18e Mon Sep 17 00:00:00 2001 From: meiyi Date: Mon, 10 Sep 2018 10:11:45 +0800 Subject: [PATCH] HBASE-21159 Add shell command to switch throttle on or off --- .../java/org/apache/hadoop/hbase/client/Admin.java | 7 + .../hbase/client/ConnectionImplementation.java | 6 + .../org/apache/hadoop/hbase/client/HBaseAdmin.java | 13 ++ .../hbase/client/ShortCircuitMasterConnection.java | 8 ++ .../apache/hadoop/hbase/zookeeper/ZNodePaths.java | 6 +- .../src/main/protobuf/Master.proto | 11 ++ .../src/main/protobuf/MasterProcedure.proto | 14 ++ .../apache/hadoop/hbase/executor/EventType.java | 9 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../hadoop/hbase/master/MasterRpcServices.java | 11 ++ .../master/procedure/RefreshThrottleProcedure.java | 151 +++++++++++++++++++++ .../master/procedure/SwitchThrottleProcedure.java | 148 ++++++++++++++++++++ .../hadoop/hbase/quotas/MasterQuotaManager.java | 24 ++++ .../hbase/quotas/RegionServerRpcQuotaManager.java | 25 +++- .../hadoop/hbase/regionserver/HRegionServer.java | 4 +- .../regionserver/RefreshThrottleCallable.java | 47 +++++++ .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java | 19 +++ hbase-shell/src/main/ruby/hbase/quotas.rb | 4 + hbase-shell/src/main/ruby/shell.rb | 2 + .../main/ruby/shell/commands/disable_throttle.rb | 40 ++++++ .../main/ruby/shell/commands/enable_throttle.rb | 40 ++++++ .../hbase/zookeeper/ThrottleStateTracker.java | 50 +++++++ 22 files changed, 634 insertions(+), 8 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshThrottleProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchThrottleProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshThrottleCallable.java create mode 100644 hbase-shell/src/main/ruby/shell/commands/disable_throttle.rb create mode 100644 hbase-shell/src/main/ruby/shell/commands/enable_throttle.rb create mode 100644 hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ThrottleStateTracker.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index a43a0b2..e4039ad 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2790,4 +2790,11 @@ public interface Admin extends Abortable, Closeable { */ void cloneTableSchema(final TableName tableName, final TableName newTableName, final boolean preserveSplits) throws IOException; + + /** + * Turn the quota throttle on or off. + * @param throttleOn Set to true to enable, false to disable. + * @return Previous quota throttle value + */ + boolean switchThrottle(final boolean throttleOn) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 67fe551..cb78d07 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1629,6 +1629,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override + public MasterProtos.SwitchThrottleResponse switchThrottle(RpcController controller, + MasterProtos.SwitchThrottleRequest request) throws ServiceException { + return stub.switchThrottle(controller, request); + } + + @Override public MasterProtos.MajorCompactionTimestampResponse getLastMajorCompactionTimestamp( RpcController controller, MasterProtos.MajorCompactionTimestampRequest request) throws ServiceException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index c54ff17..fdf35e0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -4343,4 +4343,17 @@ public class HBaseAdmin implements Admin { createTable(htd); } } + + @Override + public boolean switchThrottle(final boolean throttleOn) throws IOException { + return executeCallable(new MasterCallable(getConnection(), getRpcControllerFactory()) { + @Override + protected Boolean rpcCall() throws Exception { + return this.master + .switchThrottle(getRpcController(), + MasterProtos.SwitchThrottleRequest.newBuilder().setThrottleOn(throttleOn).build()) + .getPreviousThrottleOn(); + } + }); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java index 7bb65d2..7ae810e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ShortCircuitMasterConnection.java @@ -146,6 +146,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTable import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SplitTableRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.StopMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.TruncateTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.UnassignRegionRequest; @@ -227,6 +229,12 @@ public class ShortCircuitMasterConnection implements MasterKeepAliveConnection { } @Override + public SwitchThrottleResponse switchThrottle(RpcController controller, + SwitchThrottleRequest request) throws ServiceException { + return stub.switchThrottle(controller, request); + } + + @Override public SetNormalizerRunningResponse setNormalizerRunning(RpcController controller, SetNormalizerRunningRequest request) throws ServiceException { return stub.setNormalizerRunning(controller, request); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java index 32792f6..9a61d1d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZNodePaths.java @@ -89,6 +89,8 @@ public class ZNodePaths { public final String queuesZNode; // znode containing queues of hfile references to be replicated public final String hfileRefsZNode; + // znode containing the state of throttle + public final String throttleZNode; public ZNodePaths(Configuration conf) { baseZNode = conf.get(ZOOKEEPER_ZNODE_PARENT, DEFAULT_ZOOKEEPER_ZNODE_PARENT); @@ -123,6 +125,7 @@ public class ZNodePaths { queuesZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.rs", "rs")); hfileRefsZNode = joinZNode(replicationZNode, conf.get("zookeeper.znode.replication.hfile.refs", "hfile-refs")); + throttleZNode = joinZNode(baseZNode, conf.get("zookeeper.znode.throttle", "throttle")); } @Override @@ -136,7 +139,8 @@ public class ZNodePaths { + ", switchZNode=" + switchZNode + ", tableLockZNode=" + tableLockZNode + ", namespaceZNode=" + namespaceZNode + ", masterMaintZNode=" + masterMaintZNode + ", replicationZNode=" + replicationZNode + ", peersZNode=" + peersZNode - + ", queuesZNode=" + queuesZNode + ", hfileRefsZNode=" + hfileRefsZNode + "]"; + + ", queuesZNode=" + queuesZNode + ", hfileRefsZNode=" + hfileRefsZNode + + ", throttleZNode=" + throttleZNode + "]"; } /** diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 69e0f32..a72bb35 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -577,6 +577,14 @@ message SetQuotaRequest { optional SpaceLimitRequest space_limit = 8; } +message SwitchThrottleRequest { + required bool throttleOn = 1; +} + +message SwitchThrottleResponse { + required bool previousThrottleOn = 1; +} + message SetQuotaResponse { } @@ -913,6 +921,9 @@ service MasterService { /** Apply the new quota settings */ rpc SetQuota(SetQuotaRequest) returns(SetQuotaResponse); + /** Turn the quota throttle on or off */ + rpc SwitchThrottle(SwitchThrottleRequest) returns(SwitchThrottleResponse); + /** Returns the timestamp of the last major compaction */ rpc getLastMajorCompactionTimestamp(MajorCompactionTimestampRequest) returns(MajorCompactionTimestampResponse); diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index e50a913..4b5a449 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -547,3 +547,17 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +enum SwitchThrottleState { + SWITCH_THROTTLE_UPDATE_ZK = 1; + SWITCH_THROTTLE_REFRESH_RS = 2; + SWITCH_THROTTLE_POST = 3; +} + +message SwitchThrottleStateData { + required bool throttle_on = 1; +} + +message RefreshThrottleParameter { + required ServerName target_server = 1; +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1c..e60999a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -288,7 +288,14 @@ public enum EventType { * * RS_REPLAY_SYNC_REPLICATION_WAL */ - RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL), + + /** + * RS refresh throttle.
+ * + * RS_REFRESH_THROTTLE + */ + RS_REFRESH_THROTTLE(86, ExecutorType.RS_REFRESH_THROTTLE); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354..58b74bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_REFRESH_THROTTLE(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 8aadb98..a095ed8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -1596,6 +1596,17 @@ public class MasterRpcServices extends RSRpcServices } @Override + public MasterProtos.SwitchThrottleResponse switchThrottle(RpcController controller, + MasterProtos.SwitchThrottleRequest request) throws ServiceException { + try { + master.checkInitialized(); + return master.getMasterQuotaManager().switchThrottle(request); + } catch (Exception e) { + throw new ServiceException(e); + } + } + + @Override public MajorCompactionTimestampResponse getLastMajorCompactionTimestamp(RpcController controller, MajorCompactionTimestampRequest request) throws ServiceException { MajorCompactionTimestampResponse.Builder response = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshThrottleProcedure.java new file mode 100644 index 0000000..38d8812 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RefreshThrottleProcedure.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.replication.regionserver.RefreshThrottleCallable; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class RefreshThrottleProcedure extends Procedure + implements RemoteProcedure, ServerProcedureInterface { + + private static final Logger LOG = LoggerFactory.getLogger(RefreshThrottleProcedure.class); + private ServerName targetServer; + + public RefreshThrottleProcedure() { + } + + public RefreshThrottleProcedure(ServerName serverName) { + this.targetServer = serverName; + } + + private boolean dispatched; + private ProcedureEvent event; + private boolean succ; + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(targetServer, this); + } catch (FailedRemoteDispatchException frde) { + LOG.info("Can not add remote operation for refreshing throttle on {}, ", targetServer, frde); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.RefreshThrottleParameter.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build(); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.RefreshThrottleParameter data = + serializer.deserialize(MasterProcedureProtos.RefreshThrottleParameter.class); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation + remoteCallBuild(MasterProcedureEnv masterProcedureEnv, ServerName remote) { + assert targetServer.equals(remote); + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), + RefreshThrottleCallable.class, MasterProcedureProtos.RefreshThrottleParameter.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + @Override + public ServerName getServerName() { + return targetServer; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (error != null) { + LOG.error("Refresh throttle on {} failed", targetServer, error); + this.succ = false; + } else { + LOG.info("Refresh throttle on {} suceeded", targetServer); + this.succ = true; + } + event.wake(env.getProcedureScheduler()); + event = null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchThrottleProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchThrottleProcedure.java new file mode 100644 index 0000000..15fde6a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SwitchThrottleProcedure.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.zookeeper.ThrottleStateTracker; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchThrottleState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.SwitchThrottleStateData; + +@InterfaceAudience.Private +public class SwitchThrottleProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private static Logger LOG = LoggerFactory.getLogger(SwitchThrottleProcedure.class); + + boolean throttleOn; + ProcedurePrepareLatch syncLatch; + ServerName serverName; + ThrottleStateTracker throttleStateTracker; + + public SwitchThrottleProcedure() { + } + + public SwitchThrottleProcedure(boolean throttleOn, ServerName serverName, + final ProcedurePrepareLatch syncLatch, final ThrottleStateTracker throttleStateTracker) { + this.syncLatch = syncLatch; + this.throttleOn = throttleOn; + this.serverName = serverName; + this.throttleStateTracker = throttleStateTracker; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, SwitchThrottleState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case SWITCH_THROTTLE_UPDATE_ZK: + try { + setThrottleState(throttleOn); + } catch (IOException e) { + LOG.error("Failed to update zk for switching throttle {}", throttleOn, e); + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return Flow.NO_MORE_STATE; + } + setNextState(SwitchThrottleState.SWITCH_THROTTLE_REFRESH_RS); + return Flow.HAS_MORE_STATE; + case SWITCH_THROTTLE_REFRESH_RS: + RefreshThrottleProcedure[] subProcedures = + env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshThrottleProcedure(sn)).toArray(RefreshThrottleProcedure[]::new); + addChildProcedure(subProcedures); + setNextState(SwitchThrottleState.SWITCH_THROTTLE_POST); + return Flow.HAS_MORE_STATE; + case SWITCH_THROTTLE_POST: + ProcedurePrepareLatch.releaseLatch(syncLatch, this); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, SwitchThrottleState state) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("unhandled state=" + state); + } + + @Override + protected SwitchThrottleState getState(int stateId) { + return SwitchThrottleState.forNumber(stateId); + } + + @Override + protected int getStateId(SwitchThrottleState throttleState) { + return throttleState.getNumber(); + } + + @Override + protected SwitchThrottleState getInitialState() { + return SwitchThrottleState.SWITCH_THROTTLE_UPDATE_ZK; + } + + @Override + protected SwitchThrottleState getCurrentState() { + return super.getCurrentState(); + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + serializer.serialize(SwitchThrottleStateData.newBuilder().setThrottleOn(throttleOn).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + SwitchThrottleStateData data = serializer.deserialize(SwitchThrottleStateData.class); + throttleOn = data.getThrottleOn(); + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.CRASH_HANDLER; + } + + public void setThrottleState(boolean throttleOn) throws IOException { + try { + throttleStateTracker.setThrottleState(throttleOn); + } catch (KeeperException e) { + throw new IOException("Unable to set throttle state ", e); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java index bdeab80..b4f8f72 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/MasterQuotaManager.java @@ -39,8 +39,11 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; +import org.apache.hadoop.hbase.master.procedure.SwitchThrottleProcedure; import org.apache.hadoop.hbase.namespace.NamespaceAuditor; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.zookeeper.ThrottleStateTracker; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; @@ -54,6 +57,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SetQuotaResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchThrottleRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.SwitchThrottleResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.FileArchiveNotificationRequest.FileWithSize; @@ -79,6 +84,8 @@ public class MasterQuotaManager implements RegionStateListener { private boolean initialized = false; private NamespaceAuditor namespaceQuotaManager; private ConcurrentHashMap regionSizes; + // Tracker for quota throttle on/off state + private ThrottleStateTracker throttleStateTracker; public MasterQuotaManager(final MasterServices masterServices) { this.masterServices = masterServices; @@ -107,6 +114,9 @@ public class MasterQuotaManager implements RegionStateListener { namespaceQuotaManager = new NamespaceAuditor(masterServices); namespaceQuotaManager.start(); initialized = true; + + throttleStateTracker = new ThrottleStateTracker(masterServices.getZooKeeper(), masterServices); + throttleStateTracker.start(); } public void stop() { @@ -310,6 +320,20 @@ public class MasterQuotaManager implements RegionStateListener { } } + public SwitchThrottleResponse switchThrottle(SwitchThrottleRequest request) throws IOException { + if (initialized) { + boolean oldStatus = throttleStateTracker.isThrottleOn(); + ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch(); + SwitchThrottleProcedure procedure = new SwitchThrottleProcedure(request.getThrottleOn(), + masterServices.getServerName(), latch, throttleStateTracker); + masterServices.getMasterProcedureExecutor().submitProcedure(procedure); + latch.await(); + return SwitchThrottleResponse.newBuilder().setPreviousThrottleOn(oldStatus).build(); + } else { + return SwitchThrottleResponse.newBuilder().setPreviousThrottleOn(false).build(); + } + } + private void setQuota(final SetQuotaRequest req, final SetQuotaOperations quotaOps) throws IOException, InterruptedException { if (req.hasRemoveAll() && req.getRemoveAll() == true) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 7c21f45..ebc7f77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -27,12 +27,12 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.ipc.RpcScheduler; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.zookeeper.ThrottleStateTracker; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; @@ -51,13 +51,18 @@ public class RegionServerRpcQuotaManager { private final RegionServerServices rsServices; + private ThrottleStateTracker throttleStateTracker; + private volatile boolean isCurrentThrottleOn; private QuotaCache quotaCache = null; public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { this.rsServices = rsServices; + throttleStateTracker = new ThrottleStateTracker(rsServices.getZooKeeper(), rsServices); + throttleStateTracker.start(); + isCurrentThrottleOn = throttleStateTracker.isThrottleOn(); } - public void start(final RpcScheduler rpcScheduler) throws IOException { + public void start() throws IOException { if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { LOG.info("Quota support disabled"); return; @@ -71,13 +76,25 @@ public class RegionServerRpcQuotaManager { } public void stop() { - if (isQuotaEnabled()) { + if (QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { quotaCache.stop("shutdown"); } } + public void refreshThrottleState() { + if (QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) { + if (throttleStateTracker.isThrottleOn()) { + isCurrentThrottleOn = true; + LOG.info("Enable throttle"); + } else { + isCurrentThrottleOn = false; + LOG.info("Disable throttle"); + } + } + } + public boolean isQuotaEnabled() { - return quotaCache != null; + return quotaCache != null && isCurrentThrottleOn; } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 02815c5..89faea2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -962,7 +962,7 @@ public class HRegionServer extends HasThread implements } // Start the Quota Manager if (this.rsQuotaManager != null) { - rsQuotaManager.start(getRpcServer().getScheduler()); + rsQuotaManager.start(); } if (this.rsSpaceQuotaManager != null) { this.rsSpaceQuotaManager.start(); @@ -1926,6 +1926,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); this.executorService.startExecutorService(ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL, conf.getInt("hbase.regionserver.executor.replay.sync.replication.wal.threads", 1)); + this.executorService.startExecutorService(ExecutorType.RS_REFRESH_THROTTLE, + conf.getInt("hbase.regionserver.executor.refresh.throttle.threads", 1)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshThrottleCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshThrottleCallable.java new file mode 100644 index 0000000..5d59862 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RefreshThrottleCallable.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.replication.regionserver; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The callable executed at RS side to refresh throttle state.
+ */ +@InterfaceAudience.Private +public class RefreshThrottleCallable implements RSProcedureCallable { + private HRegionServer rs; + + @Override + public Void call() throws Exception { + rs.getRegionServerRpcQuotaManager().refreshThrottleState(); + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + } + + @Override + public EventType getEventType() { + return EventType.RS_REFRESH_PEER; + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index b84dc83..9091398 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Test; @@ -515,6 +516,24 @@ public class TestQuotaAdmin { } + @Test + public void testSwitchThrottle() throws IOException { + Admin admin = TEST_UTIL.getAdmin(); + testSwitchThrottle(admin, true, true); + testSwitchThrottle(admin, true, false); + testSwitchThrottle(admin, false, false); + testSwitchThrottle(admin, false, true); + } + + private void testSwitchThrottle(Admin admin, boolean oldThrottleState, + boolean currentThrottleState) throws IOException { + boolean state = admin.switchThrottle(currentThrottleState); + Assert.assertEquals(oldThrottleState, state); + TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .forEach(rs -> Assert.assertEquals(currentThrottleState, + rs.getRegionServer().getRegionServerRpcQuotaManager().isQuotaEnabled())); + } + private void verifyRecordPresentInQuotaTable(ThrottleType type, long limit, TimeUnit tu) throws Exception { // Verify the RPC Quotas in the table diff --git a/hbase-shell/src/main/ruby/hbase/quotas.rb b/hbase-shell/src/main/ruby/hbase/quotas.rb index 1ea8d28..50b5cce 100644 --- a/hbase-shell/src/main/ruby/hbase/quotas.rb +++ b/hbase-shell/src/main/ruby/hbase/quotas.rb @@ -243,6 +243,10 @@ module Hbase QuotaTableUtil.getObservedSnapshotSizes(@admin.getConnection) end + def throttle_switch(enableDisable) + @admin.switchThrottle(java.lang.Boolean.valueOf(enableDisable)) + end + def _parse_size(str_limit) str_limit = str_limit.downcase match = /(\d+)([bkmgtp%]*)/.match(str_limit) diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb index 17563d3..1c6bb29 100644 --- a/hbase-shell/src/main/ruby/shell.rb +++ b/hbase-shell/src/main/ruby/shell.rb @@ -432,6 +432,8 @@ Shell.load_command_group( list_quota_table_sizes list_quota_snapshots list_snapshot_sizes + enable_throttle + disable_throttle ] ) diff --git a/hbase-shell/src/main/ruby/shell/commands/disable_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/disable_throttle.rb new file mode 100644 index 0000000..b63aa42 --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/disable_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class DisableThrottle < Command + def help + return <<-EOF +Turn quota throttle off. Returns previous throttle state. +NOTE: if quota is not enabled, this will not work. + +Examples: + hbase> disable_throttle +EOF + end + + def command() + prev_state = quotas_admin.throttle_switch(false) ? 'true' : 'false' + formatter.row(["Previous throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-shell/src/main/ruby/shell/commands/enable_throttle.rb b/hbase-shell/src/main/ruby/shell/commands/enable_throttle.rb new file mode 100644 index 0000000..75c33ab --- /dev/null +++ b/hbase-shell/src/main/ruby/shell/commands/enable_throttle.rb @@ -0,0 +1,40 @@ +# +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Shell + module Commands + class EnableThrottle < Command + def help + return <<-EOF +Turn quota throttle on. Returns previous throttle state. +NOTE: if quota is not enabled, this will not work. + +Examples: + hbase> enable_throttle +EOF + end + + def command() + prev_state = quotas_admin.throttle_switch(true) ? 'true' : 'false' + formatter.row(["Previous throttle state : #{prev_state}"]) + prev_state + end + end + end +end diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ThrottleStateTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ThrottleStateTracker.java new file mode 100644 index 0000000..51ec0fa --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/ThrottleStateTracker.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.zookeeper; + +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; + +/** + * Tracks the quota throttle state in ZK + */ +@InterfaceAudience.Private +public class ThrottleStateTracker extends ZKNodeTracker { + + public ThrottleStateTracker(ZKWatcher watcher, Abortable abortable) { + super(watcher, watcher.getZNodePaths().throttleZNode, abortable); + } + + public boolean isThrottleOn() { + byte[] upData = super.getData(false); + return upData == null || Bytes.toBoolean(upData); + } + + public void setThrottleState(boolean state) throws KeeperException { + byte[] upData = Bytes.toBytes(state); + try { + ZKUtil.setData(watcher, node, upData); + } catch (KeeperException.NoNodeException nne) { + ZKUtil.createAndWatch(watcher, node, upData); + } + super.nodeDataChanged(node); + } +} \ No newline at end of file -- 2.7.4