From 62741d5a815676f8de331205d7732a818e0c7dcf Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 13 Dec 2017 21:20:59 +0800 Subject: [PATCH] HBASE-19216 Implement a general framework to execute remote procedure on RS --- .../apache/hadoop/hbase/executor/EventType.java | 317 -------------------- .../apache/hadoop/hbase/executor/ExecutorType.java | 62 ---- .../hbase/procedure2/LockedResourceType.java | 4 +- .../procedure2/RemoteProcedureDispatcher.java | 2 + .../src/main/protobuf/Admin.proto | 9 +- .../src/main/protobuf/MasterProcedure.proto | 30 ++ .../src/main/protobuf/RegionServerStatus.proto | 15 + .../apache/hadoop/hbase/executor/EventType.java | 324 +++++++++++++++++++++ .../apache/hadoop/hbase/executor/ExecutorType.java | 63 ++++ .../org/apache/hadoop/hbase/master/HMaster.java | 31 +- .../hadoop/hbase/master/MasterRpcServices.java | 13 + .../assignment/RegionTransitionProcedure.java | 12 + .../master/procedure/MasterProcedureScheduler.java | 146 +++++++++- .../master/procedure/PeerProcedureInterface.java | 34 +++ .../master/procedure/RSProcedureDispatcher.java | 74 +++-- .../master/replication/ModifyPeerProcedure.java | 127 ++++++++ .../master/replication/RefreshPeerCallable.java | 67 +++++ .../master/replication/RefreshPeerProcedure.java | 204 +++++++++++++ .../hbase/procedure2/RSProcedureCallable.java | 43 +++ .../hadoop/hbase/regionserver/HRegionServer.java | 68 ++++- .../hadoop/hbase/regionserver/RSRpcServices.java | 62 ++-- .../regionserver/handler/RSProcedureHandler.java | 51 ++++ .../master/assignment/TestAssignmentManager.java | 20 +- .../replication/DummyModifyPeerProcedure.java | 41 +++ .../replication/TestDummyModifyPeerProcedure.java | 80 +++++ .../security/access/TestAccessController.java | 7 +- 26 files changed, 1444 insertions(+), 462 deletions(-) delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java delete mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java deleted file mode 100644 index 03eb55a..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.executor; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * List of all HBase event handler types. Event types are named by a - * convention: event type names specify the component from which the event - * originated and then where its destined -- e.g. RS2ZK_ prefix means the - * event came from a regionserver destined for zookeeper -- and then what - * the even is; e.g. REGION_OPENING. - * - *

We give the enums indices so we can add types later and keep them - * grouped together rather than have to add them always to the end as we - * would have to if we used raw enum ordinals. - */ -@InterfaceAudience.Private -public enum EventType { - // Messages originating from RS (NOTE: there is NO direct communication from - // RS to Master). These are a result of RS updates into ZK. - // RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739) - - /** - * RS_ZK_REGION_CLOSED
- * - * RS has finished closing a region. - */ - RS_ZK_REGION_CLOSED (2, ExecutorType.MASTER_CLOSE_REGION), - /** - * RS_ZK_REGION_OPENING
- * - * RS is in process of opening a region. - */ - RS_ZK_REGION_OPENING (3, null), - /** - * RS_ZK_REGION_OPENED
- * - * RS has finished opening a region. - */ - RS_ZK_REGION_OPENED (4, ExecutorType.MASTER_OPEN_REGION), - /** - * RS_ZK_REGION_SPLITTING
- * - * RS has started a region split after master says it's ok to move on. - */ - RS_ZK_REGION_SPLITTING (5, null), - /** - * RS_ZK_REGION_SPLIT
- * - * RS split has completed and is notifying the master. - */ - RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS), - /** - * RS_ZK_REGION_FAILED_OPEN
- * - * RS failed to open a region. - */ - RS_ZK_REGION_FAILED_OPEN (7, ExecutorType.MASTER_CLOSE_REGION), - /** - * RS_ZK_REGION_MERGING
- * - * RS has started merging regions after master says it's ok to move on. - */ - RS_ZK_REGION_MERGING (8, null), - /** - * RS_ZK_REGION_MERGE
- * - * RS region merge has completed and is notifying the master. - */ - RS_ZK_REGION_MERGED (9, ExecutorType.MASTER_SERVER_OPERATIONS), - /** - * RS_ZK_REQUEST_REGION_SPLIT
- * - * RS has requested to split a region. This is to notify master - * and check with master if the region is in a state good to split. - */ - RS_ZK_REQUEST_REGION_SPLIT (10, null), - /** - * RS_ZK_REQUEST_REGION_MERGE
- * - * RS has requested to merge two regions. This is to notify master - * and check with master if two regions is in states good to merge. - */ - RS_ZK_REQUEST_REGION_MERGE (11, null), - - /** - * Messages originating from Master to RS.
- * M_RS_OPEN_REGION
- * Master asking RS to open a region. - */ - M_RS_OPEN_REGION (20, ExecutorType.RS_OPEN_REGION), - /** - * Messages originating from Master to RS.
- * M_RS_OPEN_ROOT
- * Master asking RS to open root. - */ - M_RS_OPEN_ROOT (21, ExecutorType.RS_OPEN_ROOT), - /** - * Messages originating from Master to RS.
- * M_RS_OPEN_META
- * Master asking RS to open meta. - */ - M_RS_OPEN_META (22, ExecutorType.RS_OPEN_META), - /** - * Messages originating from Master to RS.
- * M_RS_CLOSE_REGION
- * Master asking RS to close a region. - */ - M_RS_CLOSE_REGION (23, ExecutorType.RS_CLOSE_REGION), - /** - * Messages originating from Master to RS.
- * M_RS_CLOSE_ROOT
- * Master asking RS to close root. - */ - M_RS_CLOSE_ROOT (24, ExecutorType.RS_CLOSE_ROOT), - /** - * Messages originating from Master to RS.
- * M_RS_CLOSE_META
- * Master asking RS to close meta. - */ - M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META), - /** - * Messages originating from Master to RS.
- * M_RS_OPEN_PRIORITY_REGION
- * Master asking RS to open a priority region. - */ - M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION), - - /** - * Messages originating from Client to Master.
- * C_M_MERGE_REGION
- * Client asking Master to merge regions. - */ - C_M_MERGE_REGION (30, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_DELETE_TABLE
- * Client asking Master to delete a table. - */ - C_M_DELETE_TABLE (40, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_DISABLE_TABLE
- * Client asking Master to disable a table. - */ - C_M_DISABLE_TABLE (41, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_ENABLE_TABLE
- * Client asking Master to enable a table. - */ - C_M_ENABLE_TABLE (42, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_MODIFY_TABLE
- * Client asking Master to modify a table. - */ - C_M_MODIFY_TABLE (43, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_ADD_FAMILY
- * Client asking Master to add family to table. - */ - C_M_ADD_FAMILY (44, null), - /** - * Messages originating from Client to Master.
- * C_M_DELETE_FAMILY
- * Client asking Master to delete family of table. - */ - C_M_DELETE_FAMILY (45, null), - /** - * Messages originating from Client to Master.
- * C_M_MODIFY_FAMILY
- * Client asking Master to modify family of table. - */ - C_M_MODIFY_FAMILY (46, null), - /** - * Messages originating from Client to Master.
- * C_M_CREATE_TABLE
- * Client asking Master to create a table. - */ - C_M_CREATE_TABLE (47, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_SNAPSHOT_TABLE
- * Client asking Master to snapshot an offline table. - */ - C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), - /** - * Messages originating from Client to Master.
- * C_M_RESTORE_SNAPSHOT
- * Client asking Master to restore a snapshot. - */ - C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), - - // Updates from master to ZK. This is done by the master and there is - // nothing to process by either Master or RS - /** - * M_ZK_REGION_OFFLINE - * Master adds this region as offline in ZK - */ - M_ZK_REGION_OFFLINE (50, null), - /** - * M_ZK_REGION_CLOSING - * Master adds this region as closing in ZK - */ - M_ZK_REGION_CLOSING (51, null), - - /** - * Master controlled events to be executed on the master - * M_SERVER_SHUTDOWN - * Master is processing shutdown of a RS - */ - M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), - /** - * Master controlled events to be executed on the master.
- * M_META_SERVER_SHUTDOWN
- * Master is processing shutdown of RS hosting a meta region (-ROOT- or hbase:meta). - */ - M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), - /** - * Master controlled events to be executed on the master.
- * - * M_MASTER_RECOVERY
- * Master is processing recovery of regions found in ZK RIT - */ - M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), - /** - * Master controlled events to be executed on the master.
- * - * M_LOG_REPLAY
- * Master is processing log replay of failed region server - */ - M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS), - - /** - * RS controlled events to be executed on the RS.
- * - * RS_PARALLEL_SEEK - */ - RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK), - - /** - * RS wal recovery work items (splitting wals) to be executed on the RS.
- * - * RS_LOG_REPLAY - */ - RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS), - - /** - * RS flush triggering from secondary region replicas to primary region replica.
- * - * RS_REGION_REPLICA_FLUSH - */ - RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS), - - /** - * RS compacted files discharger
- * - * RS_COMPACTED_FILES_DISCHARGER - */ - RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER); - - private final int code; - private final ExecutorType executor; - - /** - * Constructor - */ - EventType(final int code, final ExecutorType executor) { - this.code = code; - this.executor = executor; - } - - public int getCode() { - return this.code; - } - - public static EventType get(final int code) { - // Is this going to be slow? Its used rare but still... - for (EventType et: EventType.values()) { - if (et.getCode() == code) return et; - } - throw new IllegalArgumentException("Unknown code " + code); - } - - public boolean isOnlineSchemaChangeSupported() { - return ( - this.equals(EventType.C_M_ADD_FAMILY) || - this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || - this.equals(EventType.C_M_MODIFY_TABLE) - ); - } - - ExecutorType getExecutorServiceType() { - return this.executor; - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java deleted file mode 100644 index 548c7a4..0000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.executor; - -import org.apache.yetus.audience.InterfaceAudience; - -/** - * The following is a list of all executor types, both those that run in the - * master and those that run in the regionserver. - */ -@InterfaceAudience.Private -public enum ExecutorType { - - // Master executor services - MASTER_CLOSE_REGION (1), - MASTER_OPEN_REGION (2), - MASTER_SERVER_OPERATIONS (3), - MASTER_TABLE_OPERATIONS (4), - MASTER_RS_SHUTDOWN (5), - MASTER_META_SERVER_OPERATIONS (6), - M_LOG_REPLAY_OPS (7), - - // RegionServer executor services - RS_OPEN_REGION (20), - RS_OPEN_ROOT (21), - RS_OPEN_META (22), - RS_CLOSE_REGION (23), - RS_CLOSE_ROOT (24), - RS_CLOSE_META (25), - RS_PARALLEL_SEEK (26), - RS_LOG_REPLAY_OPS (27), - RS_REGION_REPLICA_FLUSH_OPS (28), - RS_COMPACTED_FILES_DISCHARGER (29), - RS_OPEN_PRIORITY_REGION (30); - - ExecutorType(int value) {} - - /** - * @param serverName - * @return Conflation of the executor type and the passed servername. - */ - String getExecutorName(String serverName) { - return this.toString() + "-" + serverName.replace("%", "%%"); - } -} diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java index c5fe62b..dc9b5d4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public enum LockedResourceType { - SERVER, NAMESPACE, TABLE, REGION + SERVER, NAMESPACE, TABLE, REGION, PEER } diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java index 2b66e7c..9423559 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java @@ -228,6 +228,8 @@ public abstract class RemoteProcedureDispatcherWe give the enums indices so we can add types later and keep them + * grouped together rather than have to add them always to the end as we + * would have to if we used raw enum ordinals. + */ +@InterfaceAudience.Private +public enum EventType { + // Messages originating from RS (NOTE: there is NO direct communication from + // RS to Master). These are a result of RS updates into ZK. + // RS_ZK_REGION_CLOSING (1), // It is replaced by M_ZK_REGION_CLOSING(HBASE-4739) + + /** + * RS_ZK_REGION_CLOSED
+ * + * RS has finished closing a region. + */ + RS_ZK_REGION_CLOSED (2, ExecutorType.MASTER_CLOSE_REGION), + /** + * RS_ZK_REGION_OPENING
+ * + * RS is in process of opening a region. + */ + RS_ZK_REGION_OPENING (3, null), + /** + * RS_ZK_REGION_OPENED
+ * + * RS has finished opening a region. + */ + RS_ZK_REGION_OPENED (4, ExecutorType.MASTER_OPEN_REGION), + /** + * RS_ZK_REGION_SPLITTING
+ * + * RS has started a region split after master says it's ok to move on. + */ + RS_ZK_REGION_SPLITTING (5, null), + /** + * RS_ZK_REGION_SPLIT
+ * + * RS split has completed and is notifying the master. + */ + RS_ZK_REGION_SPLIT (6, ExecutorType.MASTER_SERVER_OPERATIONS), + /** + * RS_ZK_REGION_FAILED_OPEN
+ * + * RS failed to open a region. + */ + RS_ZK_REGION_FAILED_OPEN (7, ExecutorType.MASTER_CLOSE_REGION), + /** + * RS_ZK_REGION_MERGING
+ * + * RS has started merging regions after master says it's ok to move on. + */ + RS_ZK_REGION_MERGING (8, null), + /** + * RS_ZK_REGION_MERGE
+ * + * RS region merge has completed and is notifying the master. + */ + RS_ZK_REGION_MERGED (9, ExecutorType.MASTER_SERVER_OPERATIONS), + /** + * RS_ZK_REQUEST_REGION_SPLIT
+ * + * RS has requested to split a region. This is to notify master + * and check with master if the region is in a state good to split. + */ + RS_ZK_REQUEST_REGION_SPLIT (10, null), + /** + * RS_ZK_REQUEST_REGION_MERGE
+ * + * RS has requested to merge two regions. This is to notify master + * and check with master if two regions is in states good to merge. + */ + RS_ZK_REQUEST_REGION_MERGE (11, null), + + /** + * Messages originating from Master to RS.
+ * M_RS_OPEN_REGION
+ * Master asking RS to open a region. + */ + M_RS_OPEN_REGION (20, ExecutorType.RS_OPEN_REGION), + /** + * Messages originating from Master to RS.
+ * M_RS_OPEN_ROOT
+ * Master asking RS to open root. + */ + M_RS_OPEN_ROOT (21, ExecutorType.RS_OPEN_ROOT), + /** + * Messages originating from Master to RS.
+ * M_RS_OPEN_META
+ * Master asking RS to open meta. + */ + M_RS_OPEN_META (22, ExecutorType.RS_OPEN_META), + /** + * Messages originating from Master to RS.
+ * M_RS_CLOSE_REGION
+ * Master asking RS to close a region. + */ + M_RS_CLOSE_REGION (23, ExecutorType.RS_CLOSE_REGION), + /** + * Messages originating from Master to RS.
+ * M_RS_CLOSE_ROOT
+ * Master asking RS to close root. + */ + M_RS_CLOSE_ROOT (24, ExecutorType.RS_CLOSE_ROOT), + /** + * Messages originating from Master to RS.
+ * M_RS_CLOSE_META
+ * Master asking RS to close meta. + */ + M_RS_CLOSE_META (25, ExecutorType.RS_CLOSE_META), + /** + * Messages originating from Master to RS.
+ * M_RS_OPEN_PRIORITY_REGION
+ * Master asking RS to open a priority region. + */ + M_RS_OPEN_PRIORITY_REGION (26, ExecutorType.RS_OPEN_PRIORITY_REGION), + + /** + * Messages originating from Client to Master.
+ * C_M_MERGE_REGION
+ * Client asking Master to merge regions. + */ + C_M_MERGE_REGION (30, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_DELETE_TABLE
+ * Client asking Master to delete a table. + */ + C_M_DELETE_TABLE (40, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_DISABLE_TABLE
+ * Client asking Master to disable a table. + */ + C_M_DISABLE_TABLE (41, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_ENABLE_TABLE
+ * Client asking Master to enable a table. + */ + C_M_ENABLE_TABLE (42, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_MODIFY_TABLE
+ * Client asking Master to modify a table. + */ + C_M_MODIFY_TABLE (43, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_ADD_FAMILY
+ * Client asking Master to add family to table. + */ + C_M_ADD_FAMILY (44, null), + /** + * Messages originating from Client to Master.
+ * C_M_DELETE_FAMILY
+ * Client asking Master to delete family of table. + */ + C_M_DELETE_FAMILY (45, null), + /** + * Messages originating from Client to Master.
+ * C_M_MODIFY_FAMILY
+ * Client asking Master to modify family of table. + */ + C_M_MODIFY_FAMILY (46, null), + /** + * Messages originating from Client to Master.
+ * C_M_CREATE_TABLE
+ * Client asking Master to create a table. + */ + C_M_CREATE_TABLE (47, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_SNAPSHOT_TABLE
+ * Client asking Master to snapshot an offline table. + */ + C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS), + /** + * Messages originating from Client to Master.
+ * C_M_RESTORE_SNAPSHOT
+ * Client asking Master to restore a snapshot. + */ + C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS), + + // Updates from master to ZK. This is done by the master and there is + // nothing to process by either Master or RS + /** + * M_ZK_REGION_OFFLINE + * Master adds this region as offline in ZK + */ + M_ZK_REGION_OFFLINE (50, null), + /** + * M_ZK_REGION_CLOSING + * Master adds this region as closing in ZK + */ + M_ZK_REGION_CLOSING (51, null), + + /** + * Master controlled events to be executed on the master + * M_SERVER_SHUTDOWN + * Master is processing shutdown of a RS + */ + M_SERVER_SHUTDOWN (70, ExecutorType.MASTER_SERVER_OPERATIONS), + /** + * Master controlled events to be executed on the master.
+ * M_META_SERVER_SHUTDOWN
+ * Master is processing shutdown of RS hosting a meta region (-ROOT- or hbase:meta). + */ + M_META_SERVER_SHUTDOWN (72, ExecutorType.MASTER_META_SERVER_OPERATIONS), + /** + * Master controlled events to be executed on the master.
+ * + * M_MASTER_RECOVERY
+ * Master is processing recovery of regions found in ZK RIT + */ + M_MASTER_RECOVERY (73, ExecutorType.MASTER_SERVER_OPERATIONS), + /** + * Master controlled events to be executed on the master.
+ * + * M_LOG_REPLAY
+ * Master is processing log replay of failed region server + */ + M_LOG_REPLAY (74, ExecutorType.M_LOG_REPLAY_OPS), + + /** + * RS controlled events to be executed on the RS.
+ * + * RS_PARALLEL_SEEK + */ + RS_PARALLEL_SEEK (80, ExecutorType.RS_PARALLEL_SEEK), + + /** + * RS wal recovery work items (splitting wals) to be executed on the RS.
+ * + * RS_LOG_REPLAY + */ + RS_LOG_REPLAY (81, ExecutorType.RS_LOG_REPLAY_OPS), + + /** + * RS flush triggering from secondary region replicas to primary region replica.
+ * + * RS_REGION_REPLICA_FLUSH + */ + RS_REGION_REPLICA_FLUSH (82, ExecutorType.RS_REGION_REPLICA_FLUSH_OPS), + + /** + * RS compacted files discharger
+ * + * RS_COMPACTED_FILES_DISCHARGER + */ + RS_COMPACTED_FILES_DISCHARGER (83, ExecutorType.RS_COMPACTED_FILES_DISCHARGER), + + /** + * RS refresh peer.
+ * + * RS_REFRESH_PEER + */ + RS_REFRESH_PEER (84, ExecutorType.RS_REFRESH_PEER); + + private final int code; + private final ExecutorType executor; + + /** + * Constructor + */ + EventType(final int code, final ExecutorType executor) { + this.code = code; + this.executor = executor; + } + + public int getCode() { + return this.code; + } + + public static EventType get(final int code) { + // Is this going to be slow? Its used rare but still... + for (EventType et: EventType.values()) { + if (et.getCode() == code) return et; + } + throw new IllegalArgumentException("Unknown code " + code); + } + + public boolean isOnlineSchemaChangeSupported() { + return ( + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_DELETE_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_MODIFY_TABLE) + ); + } + + ExecutorType getExecutorServiceType() { + return this.executor; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java new file mode 100644 index 0000000..a12b2ea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -0,0 +1,63 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.executor; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * The following is a list of all executor types, both those that run in the + * master and those that run in the regionserver. + */ +@InterfaceAudience.Private +public enum ExecutorType { + + // Master executor services + MASTER_CLOSE_REGION (1), + MASTER_OPEN_REGION (2), + MASTER_SERVER_OPERATIONS (3), + MASTER_TABLE_OPERATIONS (4), + MASTER_RS_SHUTDOWN (5), + MASTER_META_SERVER_OPERATIONS (6), + M_LOG_REPLAY_OPS (7), + + // RegionServer executor services + RS_OPEN_REGION (20), + RS_OPEN_ROOT (21), + RS_OPEN_META (22), + RS_CLOSE_REGION (23), + RS_CLOSE_ROOT (24), + RS_CLOSE_META (25), + RS_PARALLEL_SEEK (26), + RS_LOG_REPLAY_OPS (27), + RS_REGION_REPLICA_FLUSH_OPS (28), + RS_COMPACTED_FILES_DISCHARGER (29), + RS_OPEN_PRIORITY_REGION (30), + RS_REFRESH_PEER (31); + + ExecutorType(int value) {} + + /** + * @param serverName + * @return Conflation of the executor type and the passed servername. + */ + String getExecutorName(String serverName) { + return this.toString() + "-" + serverName.replace("%", "%%"); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1c57620..08f764b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -135,6 +134,7 @@ import org.apache.hadoop.hbase.procedure2.LockedResource; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; import org.apache.hadoop.hbase.quotas.MasterQuotaManager; import org.apache.hadoop.hbase.quotas.MasterSpaceQuotaObserver; @@ -326,8 +326,7 @@ public class HMaster extends HRegionServer implements MasterServices { // flag set after we become the active master (used for testing) private volatile boolean activeMaster = false; - // flag set after we complete initialization once active, - // it is not private since it's used in unit tests + // flag set after we complete initialization once active private final ProcedureEvent initialized = new ProcedureEvent("master initialized"); // flag set after master services are started, @@ -3531,4 +3530,28 @@ public class HMaster extends HRegionServer implements MasterServices { public SpaceQuotaSnapshotNotifier getSpaceQuotaSnapshotNotifier() { return this.spaceQuotaSnapshotNotifier; } + + @SuppressWarnings("unchecked") + private RemoteProcedure getRemoteProcedure(long procId) { + Procedure procedure = procedureExecutor.getProcedure(procId); + if (procedure == null) { + return null; + } + assert procedure instanceof RemoteProcedure; + return (RemoteProcedure) procedure; + } + + public void remoteProcedureCompleted(long procId) { + RemoteProcedure procedure = getRemoteProcedure(procId); + if (procedure != null) { + procedure.remoteOperationCompleted(procedureExecutor.getEnvironment()); + } + } + + public void remoteProcedureFailed(long procId, String error) { + RemoteProcedure procedure = getRemoteProcedure(procId); + if (procedure != null) { + procedure.remoteOperationFailed(procedureExecutor.getEnvironment(), error); + } + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index ce85b66..7c08b6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -253,6 +253,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; @@ -2210,4 +2212,15 @@ public class MasterRpcServices extends RSRpcServices } return response.build(); } + + @Override + public ReportProcedureDoneResponse reportProcedureDone(RpcController controller, + ReportProcedureDoneRequest request) throws ServiceException { + if (request.getStatus() == ReportProcedureDoneRequest.Status.SUCCESS) { + master.remoteProcedureCompleted(request.getProcId()); + } else { + master.remoteProcedureFailed(request.getProcId(), request.getError()); + } + return ReportProcedureDoneResponse.getDefaultInstance(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java index 6bb2cba..810d7a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionTransitionProcedure.java @@ -412,4 +412,16 @@ public abstract class RegionTransitionProcedure * @return ServerName the Assign or Unassign is going against. */ public abstract ServerName getServer(final MasterProcedureEnv env); + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + // should not be called for region operation until we modified the open/close region procedure + throw new UnsupportedOperationException(); + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, String error) { + // should not be called for region operation until we modified the open/close region procedure + throw new UnsupportedOperationException(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 9402845..4bd0bbe 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface.PeerOperationType; import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler; import org.apache.hadoop.hbase.procedure2.LockAndQueue; @@ -110,12 +111,17 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { new ServerQueueKeyComparator(); private final static TableQueueKeyComparator TABLE_QUEUE_KEY_COMPARATOR = new TableQueueKeyComparator(); + private final static PeerQueueKeyComparator PEER_QUEUE_KEY_COMPARATOR = + new PeerQueueKeyComparator(); private final FairQueue serverRunQueue = new FairQueue<>(); private final FairQueue tableRunQueue = new FairQueue<>(); + private final FairQueue peerRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; + private PeerQueue peerMap = null; + private final SchemaLocking locking = new SchemaLocking(); /** @@ -162,6 +168,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); } else if (isServerProcedure(proc)) { doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); + } else if (isPeerProcedure(proc)) { + doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -173,7 +181,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } private > void doAdd(final FairQueue fairq, - final Queue queue, final Procedure proc, final boolean addFront) { + final Queue queue, final Procedure proc, final boolean addFront) { queue.add(proc, addFront); if (!queue.getLockStatus().hasExclusiveLock() || queue.getLockStatus().isLockOwner(proc.getProcId())) { // if the queue was not remove for an xlock execution @@ -190,7 +198,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override protected boolean queueHasRunnables() { - return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables(); + return tableRunQueue.hasRunnables() || serverRunQueue.hasRunnables() || + peerRunQueue.hasRunnables(); } @Override @@ -198,7 +207,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. - Procedure pollResult = doPoll(serverRunQueue); + Procedure pollResult = doPoll(serverRunQueue); + if (pollResult == null) { + pollResult = doPoll(peerRunQueue); + } if (pollResult == null) { pollResult = doPoll(tableRunQueue); } @@ -432,6 +444,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { markTableAsDeleted(iProcTable.getTableName(), proc); return; } + } else if (proc instanceof PeerProcedureInterface) { + PeerProcedureInterface iProcPeer = (PeerProcedureInterface) proc; + if (iProcPeer.getPeerOperationType() == PeerOperationType.REMOVE) { + removePeerQueue(iProcPeer.getPeerId()); + } } else { // No cleanup for ServerProcedureInterface types, yet. return; @@ -469,12 +486,11 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { locking.removeTableLock(tableName); } - - private static boolean isTableProcedure(Procedure proc) { + private static boolean isTableProcedure(Procedure proc) { return proc instanceof TableProcedureInterface; } - private static TableName getTableName(Procedure proc) { + private static TableName getTableName(Procedure proc) { return ((TableProcedureInterface)proc).getTableName(); } @@ -495,15 +511,42 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return Math.abs(hashCode) % buckets.length; } - private static boolean isServerProcedure(Procedure proc) { + private static boolean isServerProcedure(Procedure proc) { return proc instanceof ServerProcedureInterface; } - private static ServerName getServerName(Procedure proc) { + private static ServerName getServerName(Procedure proc) { return ((ServerProcedureInterface)proc).getServerName(); } // ============================================================================ + // Peer Queue Lookup Helpers + // ============================================================================ + private PeerQueue getPeerQueue(String peerId) { + PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); + if (node != null) { + return node; + } + node = new PeerQueue(peerId, locking.getPeerLock(peerId)); + peerMap = AvlTree.insert(peerMap, node); + return node; + } + + private void removePeerQueue(String peerId) { + peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR); + locking.removePeerLock(peerId); + } + + + private static boolean isPeerProcedure(Procedure proc) { + return proc instanceof PeerProcedureInterface; + } + + private static String getPeerId(Procedure proc) { + return ((PeerProcedureInterface) proc).getPeerId(); + } + + // ============================================================================ // Table and Server Queue Implementation // ============================================================================ private static class ServerQueueKeyComparator implements AvlKeyComparator { @@ -572,6 +615,26 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } } + private static class PeerQueueKeyComparator implements AvlKeyComparator { + + @Override + public int compareKey(PeerQueue node, Object key) { + return node.compareKey((String) key); + } + } + + public static class PeerQueue extends Queue { + + public PeerQueue(String peerId, LockStatus lockStatus) { + super(peerId, lockStatus); + } + + @Override + public boolean requireExclusiveLock(Procedure proc) { + return requirePeerExclusiveLock((PeerProcedureInterface) proc); + } + } + // ============================================================================ // Table Locking Helpers // ============================================================================ @@ -959,7 +1022,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param serverName Server to lock * @return true if the procedure has to wait for the server to be available */ - public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { + public boolean waitServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); @@ -981,7 +1044,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { * @param procedure the procedure releasing the lock * @param serverName the server that has the exclusive lock */ - public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { + public void wakeServerExclusiveLock(final Procedure procedure, final ServerName serverName) { schedLock(); try { final LockAndQueue lock = locking.getServerLock(serverName); @@ -995,6 +1058,56 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } // ============================================================================ + // Peer Locking Helpers + // ============================================================================ + + private static boolean requirePeerExclusiveLock(PeerProcedureInterface proc) { + return proc.getPeerOperationType() != PeerOperationType.REFRESH; + } + + /** + * Try to acquire the exclusive lock on the specified peer. + * @see #wakePeerExclusiveLock(Procedure, String) + * @param procedure the procedure trying to acquire the lock + * @param peerId peer to lock + * @return true if the procedure has to wait for the per to be available + */ + public boolean waitPeerExclusiveLock(Procedure procedure, String peerId) { + schedLock(); + try { + final LockAndQueue lock = locking.getPeerLock(peerId); + if (lock.tryExclusiveLock(procedure)) { + removeFromRunQueue(peerRunQueue, getPeerQueue(peerId)); + return false; + } + waitProcedure(lock, procedure); + logLockedResource(LockedResourceType.PEER, peerId); + return true; + } finally { + schedUnlock(); + } + } + + /** + * Wake the procedures waiting for the specified peer + * @see #waitPeerExclusiveLock(Procedure, String) + * @param procedure the procedure releasing the lock + * @param serverName the server that has the exclusive lock + */ + public void wakePeerExclusiveLock(Procedure procedure, String peerId) { + schedLock(); + try { + final LockAndQueue lock = locking.getPeerLock(peerId); + lock.releaseExclusiveLock(procedure); + addToRunQueue(peerRunQueue, getPeerQueue(peerId)); + int waitingCount = wakeWaitingProcedures(lock); + wakePollIfNeeded(waitingCount); + } finally { + schedUnlock(); + } + } + + // ============================================================================ // Generic Helpers // ============================================================================ private static abstract class Queue> @@ -1099,6 +1212,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { final Map tableLocks = new HashMap<>(); // Single map for all regions irrespective of tables. Key is encoded region name. final Map regionLocks = new HashMap<>(); + final Map peerLocks = new HashMap<>(); private LockAndQueue getLock(Map map, T key) { LockAndQueue lock = map.get(key); @@ -1133,6 +1247,14 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return getLock(serverLocks, serverName); } + LockAndQueue getPeerLock(String peerId) { + return getLock(peerLocks, peerId); + } + + LockAndQueue removePeerLock(String peerId) { + return peerLocks.remove(peerId); + } + /** * Removes all locks by clearing the maps. * Used when procedure executor is stopped for failure and recovery testing. @@ -1143,6 +1265,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { namespaceLocks.clear(); tableLocks.clear(); regionLocks.clear(); + peerLocks.clear(); } @Override @@ -1150,7 +1273,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { return "serverLocks=" + filterUnlocked(this.serverLocks) + ", namespaceLocks=" + filterUnlocked(this.namespaceLocks) + ", tableLocks=" + filterUnlocked(this.tableLocks) + - ", regionLocks=" + filterUnlocked(this.regionLocks); + ", regionLocks=" + filterUnlocked(this.regionLocks) + + ", peerLocks=" + filterUnlocked(this.peerLocks); } private String filterUnlocked(Map locks) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java new file mode 100644 index 0000000..4abc9ad --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/PeerProcedureInterface.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface PeerProcedureInterface { + + enum PeerOperationType { + ADD, REMOVE, ENABLE, DISABLE, UPDATE_CONFIG, REFRESH + } + + String getPeerId(); + + PeerOperationType getPeerOperationType(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 045c416..9e666ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ArrayListMultimap; +import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException; @@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; /** * A remote procecdure dispatcher for regionservers. @@ -223,7 +225,10 @@ public class RSProcedureDispatcher private interface RemoteProcedureResolver { void dispatchOpenRequests(MasterProcedureEnv env, List operations); + void dispatchCloseRequests(MasterProcedureEnv env, List operations); + + void dispatchServerOperations(MasterProcedureEnv env, List operations); } /** @@ -235,19 +240,25 @@ public class RSProcedureDispatcher * @param remoteProcedures Remote procedures which are dispatched to the given server * @param resolver Used to dispatch remote procedures to given server. */ - public void splitAndResolveOperation(final ServerName serverName, - final Set remoteProcedures, final RemoteProcedureResolver resolver) { - final ArrayListMultimap, RemoteOperation> reqsByType = - buildAndGroupRequestByType(procedureEnv, serverName, remoteProcedures); + public void splitAndResolveOperation(ServerName serverName, Set operations, + RemoteProcedureResolver resolver) { + MasterProcedureEnv env = master.getMasterProcedureExecutor().getEnvironment(); + ArrayListMultimap, RemoteOperation> reqsByType = + buildAndGroupRequestByType(env, serverName, operations); - final List openOps = fetchType(reqsByType, RegionOpenOperation.class); + List openOps = fetchType(reqsByType, RegionOpenOperation.class); if (!openOps.isEmpty()) { - resolver.dispatchOpenRequests(procedureEnv, openOps); + resolver.dispatchOpenRequests(env, openOps); } - final List closeOps = fetchType(reqsByType, RegionCloseOperation.class); + List closeOps = fetchType(reqsByType, RegionCloseOperation.class); if (!closeOps.isEmpty()) { - resolver.dispatchCloseRequests(procedureEnv, closeOps); + resolver.dispatchCloseRequests(env, closeOps); + } + + List refreshOps = fetchType(reqsByType, ServerOperation.class); + if (!refreshOps.isEmpty()) { + resolver.dispatchServerOperations(env, refreshOps); } if (!reqsByType.isEmpty()) { @@ -278,8 +289,7 @@ public class RSProcedureDispatcher splitAndResolveOperation(getServerName(), remoteProcedures, this); try { - final ExecuteProceduresResponse response = sendRequest(getServerName(), request.build()); - remoteCallCompleted(procedureEnv, response); + sendRequest(getServerName(), request.build()); } catch (IOException e) { e = unwrapException(e); // TODO: In the future some operation may want to bail out early. @@ -303,6 +313,11 @@ public class RSProcedureDispatcher } } + @Override + public void dispatchServerOperations(MasterProcedureEnv env, List operations) { + operations.stream().map(o -> o.buildRequest()).forEachOrdered(request::addProc); + } + protected ExecuteProceduresResponse sendRequest(final ServerName serverName, final ExecuteProceduresRequest request) throws IOException { try { @@ -312,17 +327,8 @@ public class RSProcedureDispatcher } } - - private void remoteCallCompleted(final MasterProcedureEnv env, - final ExecuteProceduresResponse response) { - /* - for (RemoteProcedure proc: operations) { - proc.remoteCallCompleted(env, getServerName(), response); - }*/ - } - private void remoteCallFailed(final MasterProcedureEnv env, final IOException e) { - for (RemoteProcedure proc: remoteProcedures) { + for (RemoteProcedure proc : remoteProcedures) { proc.remoteCallFailed(env, getServerName(), e); } } @@ -483,6 +489,11 @@ public class RSProcedureDispatcher submitTask(new CloseRegionRemoteCall(serverName, op)); } } + + @Override + public void dispatchServerOperations(MasterProcedureEnv env, List operations) { + throw new UnsupportedOperationException(); + } } // ========================================================================== @@ -490,13 +501,28 @@ public class RSProcedureDispatcher // - ServerOperation: refreshConfig, grant, revoke, ... (TODO) // - RegionOperation: open, close, flush, snapshot, ... // ========================================================================== - /* Currently unused - public static abstract class ServerOperation extends RemoteOperation { - protected ServerOperation(final RemoteProcedure remoteProcedure) { + + public static final class ServerOperation extends RemoteOperation { + + private final long procId; + + private final Class rsProcClass; + + private final byte[] rsProcData; + + public ServerOperation(RemoteProcedure remoteProcedure, long procId, Class rsProcClass, + byte[] rsProcData) { super(remoteProcedure); + this.procId = procId; + this.rsProcClass = rsProcClass; + this.rsProcData = rsProcData; + } + + public RemoteProcedureRequest buildRequest() { + return RemoteProcedureRequest.newBuilder().setProcId(procId) + .setProcClass(rsProcClass.getName()).setProcData(ByteString.copyFrom(rsProcData)).build(); } } - */ public static abstract class RegionOperation extends RemoteOperation { private final RegionInfo regionInfo; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java new file mode 100644 index 0000000..fca05a7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState; + +@InterfaceAudience.Private +public abstract class ModifyPeerProcedure + extends StateMachineProcedure + implements PeerProcedureInterface { + + private static final Log LOG = LogFactory.getLog(ModifyPeerProcedure.class); + + protected String peerId; + + protected ModifyPeerProcedure() { + } + + protected ModifyPeerProcedure(String peerId) { + this.peerId = peerId; + } + + @Override + public String getPeerId() { + return peerId; + } + + /** + * Return {@code false} means that the operation is invalid and we should give up, otherwise + * {@code true}. + *

+ * You need to call {@link #setFailure(String, Throwable)} to give the detail failure information. + */ + protected abstract boolean updatePeerStorage() throws IOException; + + protected void postPeerModification() { + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + switch (state) { + case UPDATE_PEER_STORAGE: + try { + if (!updatePeerStorage()) { + assert isFailed() : "setFailure is not called"; + return Flow.NO_MORE_STATE; + } + } catch (IOException e) { + LOG.warn("update peer storage failed, retry", e); + throw new ProcedureYieldException(); + } + setNextState(PeerModificationState.REFRESH_PEER_ON_RS); + return Flow.HAS_MORE_STATE; + case REFRESH_PEER_ON_RS: + addChildProcedure(env.getMasterServices().getServerManager().getOnlineServersList().stream() + .map(sn -> new RefreshPeerProcedure(peerId, getPeerOperationType(), sn)) + .toArray(RefreshPeerProcedure[]::new)); + setNextState(PeerModificationState.POST_PEER_MODIFICATION); + return Flow.HAS_MORE_STATE; + case POST_PEER_MODIFICATION: + postPeerModification(); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected LockState acquireLock(MasterProcedureEnv env) { + return env.getProcedureScheduler().waitPeerExclusiveLock(this, peerId) + ? LockState.LOCK_EVENT_WAIT + : LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(MasterProcedureEnv env) { + env.getProcedureScheduler().wakePeerExclusiveLock(this, peerId); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, PeerModificationState state) + throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected PeerModificationState getState(int stateId) { + return PeerModificationState.forNumber(stateId); + } + + @Override + protected int getStateId(PeerModificationState state) { + return state.getNumber(); + } + + @Override + protected PeerModificationState getInitialState() { + return PeerModificationState.UPDATE_PEER_STORAGE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java new file mode 100644 index 0000000..56d7ddd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerCallable.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; + +/** + * The callable executed at RS side to refresh the peer config/state. + *

+ * TODO: only a dummy implementation for verifying the framework, will add implementation later. + */ +@InterfaceAudience.Private +public class RefreshPeerCallable implements RSProcedureCallable { + + private HRegionServer rs; + + private String peerId; + + private Exception initError; + + @Override + public Void call() throws Exception { + if (initError != null) { + + } + rs.getFileSystem().create(new Path("/" + peerId + "/" + rs.getServerName().toString())).close(); + return null; + } + + @Override + public void init(byte[] parameter, HRegionServer rs) { + this.rs = rs; + try { + this.peerId = RefreshPeerParameter.parseFrom(parameter).getPeerId(); + } catch (InvalidProtocolBufferException e) { + initError = e; + return; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_REFRESH_PEER; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java new file mode 100644 index 0000000..ee00467 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/RefreshPeerProcedure.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; +import org.apache.hadoop.hbase.master.procedure.RSProcedureDispatcher.ServerOperation; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteOperation; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerStateData; + +@InterfaceAudience.Private +public class RefreshPeerProcedure extends Procedure + implements PeerProcedureInterface, RemoteProcedure { + + private static final Log LOG = LogFactory.getLog(RefreshPeerProcedure.class); + + private String peerId; + + private PeerOperationType type; + + private ServerName targetServer; + + private boolean dispatched; + + private ProcedureEvent event; + + private boolean succ; + + public RefreshPeerProcedure() { + } + + public RefreshPeerProcedure(String peerId, PeerOperationType type, ServerName targetServer) { + this.peerId = peerId; + this.type = type; + this.targetServer = targetServer; + } + + @Override + public String getPeerId() { + return peerId; + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.REFRESH; + } + + private static PeerModificationType toPeerModificationType(PeerOperationType type) { + switch (type) { + case ADD: + return PeerModificationType.ADD_PEER; + case REMOVE: + return PeerModificationType.REMOVE_PEER; + case ENABLE: + return PeerModificationType.ENABLE_PEER; + case DISABLE: + return PeerModificationType.DISABLE_PEER; + case UPDATE_CONFIG: + return PeerModificationType.UPDATE_PEER_CONFIG; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + private static PeerOperationType toPeerOperationType(PeerModificationType type) { + switch (type) { + case ADD_PEER: + return PeerOperationType.ADD; + case REMOVE_PEER: + return PeerOperationType.REMOVE; + case ENABLE_PEER: + return PeerOperationType.ENABLE; + case DISABLE_PEER: + return PeerOperationType.DISABLE; + case UPDATE_PEER_CONFIG: + return PeerOperationType.UPDATE_CONFIG; + default: + throw new IllegalArgumentException("Unknown type: " + type); + } + } + + @Override + public RemoteOperation remoteCallBuild(MasterProcedureEnv env, ServerName remote) { + assert targetServer.equals(remote); + return new ServerOperation(this, getProcId(), RefreshPeerCallable.class, + RefreshPeerParameter.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(remote)).build().toByteArray()); + } + + @Override + public void remoteCallCompleted(MasterProcedureEnv env, ServerName remote, + RemoteOperation response) { + // should not be called, this is only for legacy open/close region? + throw new UnsupportedOperationException(); + } + + private void complete(MasterProcedureEnv env, boolean succ) { + if (event == null) { + LOG.warn("procedure event for " + getProcId() + + " is null, maybe the procedure is created when recovery", new Exception()); + return; + } + LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + + (succ ? " suceeded" : " failed")); + this.succ = succ; + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public synchronized void remoteCallFailed(MasterProcedureEnv env, ServerName remote, + IOException exception) { + complete(env, false); + } + + @Override + public synchronized void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, true); + } + + @Override + public synchronized void remoteOperationFailed(MasterProcedureEnv env, String error) { + complete(env, false); + } + + @Override + protected synchronized Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (succ) { + return null; + } + // retry + dispatched = false; + } + if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { + LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type + + " to " + targetServer + ", this usually because the server is already dead," + + " give up and mark the procedure as complete"); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + // TODO: no correctness problem if we just ignore this, implement later. + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + serializer.serialize( + RefreshPeerStateData.newBuilder().setPeerId(peerId).setType(toPeerModificationType(type)) + .setTargetServer(ProtobufUtil.toServerName(targetServer)).build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + RefreshPeerStateData data = serializer.deserialize(RefreshPeerStateData.class); + peerId = data.getPeerId(); + type = toPeerOperationType(data.getType()); + targetServer = ProtobufUtil.toServerName(data.getTargetServer()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java new file mode 100644 index 0000000..62c2e36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/RSProcedureCallable.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.Callable; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A general interface for a sub procedure runs at RS side. + */ +@InterfaceAudience.Private +public interface RSProcedureCallable extends Callable { + + /** + * Initialize the callable + * @param parameter the parameter passed from master. + * @param rs the regionserver instance + */ + void init(byte[] parameter, HRegionServer rs); + + /** + * Event type used to select thread pool. + */ + EventType getEventType(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index cb0632d..a2febe8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import javax.management.MalformedObjectNameException; -import javax.management.ObjectName; -import javax.servlet.http.HttpServlet; import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -51,6 +48,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.servlet.http.HttpServlet; + import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.SystemUtils; import org.apache.commons.logging.Log; @@ -118,6 +119,7 @@ import org.apache.hadoop.hbase.master.LoadBalancer; import org.apache.hadoop.hbase.master.RegionState.State; import org.apache.hadoop.hbase.mob.MobCacheConfig; import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.FileSystemUtilizationChore; import org.apache.hadoop.hbase.quotas.QuotaUtil; import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; @@ -128,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; +import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler; import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; @@ -175,6 +178,7 @@ import org.apache.zookeeper.KeeperException; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps; import org.apache.hadoop.hbase.shaded.com.google.protobuf.BlockingRpcChannel; import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController; @@ -206,6 +210,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionSpaceUseReportRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; @@ -1931,6 +1936,8 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.regionserver.region.replica.flusher.threads", conf.getInt("hbase.regionserver.executor.openregion.threads", 3))); } + this.executorService.startExecutorService(ExecutorType.RS_REFRESH_PEER, + conf.getInt("hbase.regionserver.executor.refresh.peer.threads", 2)); Threads.setDaemonThreadRunning(this.walRoller.getThread(), getName() + ".logRoller", uncaughtExceptionHandler); @@ -3723,4 +3730,59 @@ public class HRegionServer extends HasThread implements return ConnectionUtils.createShortCircuitConnection(conf, null, user, this.serverName, this.rpcServices, this.rpcServices); } + + public void executeProcedure(long procId, RSProcedureCallable callable) { + executorService.submit(new RSProcedureHandler(this, procId, callable)); + } + + public void reportProcedureDone(long procId, Throwable error) { + ReportProcedureDoneRequest.Builder builder = + ReportProcedureDoneRequest.newBuilder().setProcId(procId); + if (error != null) { + builder.setStatus(ReportProcedureDoneRequest.Status.ERROR) + .setError(Throwables.getStackTraceAsString(error)); + } else { + builder.setStatus(ReportProcedureDoneRequest.Status.SUCCESS); + } + ReportProcedureDoneRequest request = builder.build(); + int tries = 0; + long pauseTime = INIT_PAUSE_TIME_MS; + while (keepLooping()) { + RegionServerStatusService.BlockingInterface rss = rssStub; + try { + if (rss == null) { + createRegionServerStatusStub(); + continue; + } + rss.reportProcedureDone(null, request); + // Log if we had to retry else don't log unless TRACE. We want to + // know if were successful after an attempt showed in logs as failed. + if (tries > 0 || LOG.isTraceEnabled()) { + LOG.info("PROCEDURE REPORTED " + request); + } + return; + } catch (ServiceException se) { + IOException ioe = ProtobufUtil.getRemoteException(se); + boolean pause = + ioe instanceof ServerNotRunningYetException || ioe instanceof PleaseHoldException; + if (pause) { + // Do backoff else we flood the Master with requests. + pauseTime = ConnectionUtils.getPauseTime(INIT_PAUSE_TIME_MS, tries); + } else { + pauseTime = INIT_PAUSE_TIME_MS; // Reset. + } + LOG.info("Failed report transition " + TextFormat.shortDebugString(request) + "; retry (#" + + tries + ")" + (pause ? " after " + pauseTime + "ms delay (Master is coming online...)." + : " immediately."), + ioe); + if (pause) { + Threads.sleep(pauseTime); + } + tries++; + if (rssStub == rss) { + rssStub = null; + } + } + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f5a35a4..9c73a08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1,5 +1,4 @@ -/* - * +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -60,9 +59,10 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.PleaseHoldException; +import org.apache.hadoop.hbase.PrivateCellUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; @@ -99,6 +99,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.MasterRpcServices; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement; import org.apache.hadoop.hbase.quotas.OperationQuota; import org.apache.hadoop.hbase.quotas.QuotaUtil; @@ -123,6 +124,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Strings; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -169,6 +171,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionR import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse.RegionOpeningState; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RemoteProcedureRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest; @@ -220,6 +223,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor; @@ -3418,23 +3423,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } @Override - public ExecuteProceduresResponse executeProcedures(RpcController controller, - ExecuteProceduresRequest request) throws ServiceException { - ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); - if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req : request.getOpenRegionList()) { - builder.addOpenRegion(openRegion(controller, req)); - } - } - if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req : request.getCloseRegionList()) { - builder.addCloseRegion(closeRegion(controller, req)); - } - } - return builder.build(); - } - - @Override public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller, ClearRegionBlockCacheRequest request) throws ServiceException { @@ -3451,4 +3439,38 @@ public class RSRpcServices implements HBaseRPCErrorHandler, .setStats(ProtobufUtil.toCacheEvictionStats(stats.build())) .build(); } + + @Override + public ExecuteProceduresResponse executeProcedures(RpcController controller, + ExecuteProceduresRequest request) throws ServiceException { + if (request.getOpenRegionCount() > 0) { + for (OpenRegionRequest req : request.getOpenRegionList()) { + openRegion(controller, req); + } + } + if (request.getCloseRegionCount() > 0) { + for (CloseRegionRequest req : request.getCloseRegionList()) { + closeRegion(controller, req); + } + } + if (request.getProcCount() > 0) { + for (RemoteProcedureRequest req : request.getProcList()) { + RSProcedureCallable callable; + try { + callable = + Class.forName(req.getProcClass()).asSubclass(RSProcedureCallable.class).newInstance(); + } catch (Exception e) { + // here we just ignore the error as this should not happen and we do not provide a general + // way to report errors for all types of remote procedure. The procedure will hang at + // master side but after you solve the problem and restart master it will be executed + // again and pass. + LOG.warn("create procedure of type " + req.getProcClass() + " failed, give up", e); + continue; + } + callable.init(req.getProcData().toByteArray(), regionServer); + regionServer.executeProcedure(req.getProcId(), callable); + } + } + return ExecuteProceduresResponse.getDefaultInstance(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java new file mode 100644 index 0000000..94bcfec --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RSProcedureHandler.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.handler; + +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A event handler for running procedure. + */ +@InterfaceAudience.Private +public class RSProcedureHandler extends EventHandler { + + private final long procId; + + private final RSProcedureCallable callable; + + public RSProcedureHandler(HRegionServer rs, long procId, RSProcedureCallable callable) { + super(rs, callable.getEventType()); + this.procId = procId; + this.callable = callable; + } + + @Override + public void process() { + Exception error = null; + try { + callable.call(); + } catch (Exception e) { + error = e; + } + ((HRegionServer) server).reportProcedureDone(procId, error); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java index 1912d11..eead71e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestAssignmentManager.java @@ -530,26 +530,16 @@ public class TestAssignmentManager { private class NoopRsExecutor implements MockRSExecutor { public ExecuteProceduresResponse sendRequest(ServerName server, ExecuteProceduresRequest request) throws IOException { - ExecuteProceduresResponse.Builder builder = ExecuteProceduresResponse.newBuilder(); if (request.getOpenRegionCount() > 0) { - for (OpenRegionRequest req: request.getOpenRegionList()) { - OpenRegionResponse.Builder resp = OpenRegionResponse.newBuilder(); - for (RegionOpenInfo openReq: req.getOpenInfoList()) { - RegionOpeningState state = execOpenRegion(server, openReq); - if (state != null) { - resp.addOpeningState(state); - } + for (OpenRegionRequest req : request.getOpenRegionList()) { + for (RegionOpenInfo openReq : req.getOpenInfoList()) { + execOpenRegion(server, openReq); } - builder.addOpenRegion(resp.build()); } } if (request.getCloseRegionCount() > 0) { - for (CloseRegionRequest req: request.getCloseRegionList()) { - CloseRegionResponse resp = execCloseRegion(server, - req.getRegion().getValue().toByteArray()); - if (resp != null) { - builder.addCloseRegion(resp); - } + for (CloseRegionRequest req : request.getCloseRegionList()) { + execCloseRegion(server, req.getRegion().getValue().toByteArray()); } } return ExecuteProceduresResponse.newBuilder().build(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java new file mode 100644 index 0000000..44343d7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/DummyModifyPeerProcedure.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import java.io.IOException; + +public class DummyModifyPeerProcedure extends ModifyPeerProcedure { + + public DummyModifyPeerProcedure() { + } + + public DummyModifyPeerProcedure(String peerId) { + super(peerId); + } + + @Override + public PeerOperationType getPeerOperationType() { + return PeerOperationType.ADD; + } + + @Override + protected boolean updatePeerStorage() throws IOException { + return true; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java new file mode 100644 index 0000000..ec06306 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestDummyModifyPeerProcedure.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.replication; + +import static org.junit.Assert.assertTrue; + +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, LargeTests.class }) +public class TestDummyModifyPeerProcedure { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static String PEER_ID; + + private static Path DIR; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(3); + PEER_ID = "testPeer"; + DIR = new Path("/" + PEER_ID); + UTIL.getTestFileSystem().mkdirs(DIR); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + ProcedureExecutor executor = + UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor(); + long procId = executor.submitProcedure(new DummyModifyPeerProcedure(PEER_ID)); + UTIL.waitFor(30000, new Waiter.Predicate() { + + @Override + public boolean evaluate() throws Exception { + return executor.isFinished(procId); + } + }); + Set serverNames = UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(t -> t.getRegionServer().getServerName().toString()) + .collect(Collectors.toCollection(HashSet::new)); + for (FileStatus s : UTIL.getTestFileSystem().listStatus(DIR)) { + assertTrue(serverNames.remove(s.getPath().getName())); + } + assertTrue(serverNames.isEmpty()); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 6791465..29c702b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -30,12 +30,14 @@ import com.google.protobuf.RpcCallback; import com.google.protobuf.RpcController; import com.google.protobuf.Service; import com.google.protobuf.ServiceException; + import java.io.IOException; import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -120,8 +122,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.access.Permission.Action; -import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; @@ -137,6 +137,9 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; + /** * Performs authorization checks for common operations, according to different * levels of authorized users. -- 2.7.4