From 870ad49ce16758b4168ac772f648ab1f023606ea Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Mon, 10 Dec 2018 11:26:46 +0800 Subject: [PATCH] HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server --- .../apache/hadoop/hbase/master/ServerManager.java | 4 +- .../hbase/master/assignment/AssignmentManager.java | 46 +++++++++++++------- .../master/procedure/ServerCrashProcedure.java | 5 +++ .../apache/hadoop/hbase/HBaseTestingUtility.java | 7 +++- .../hadoop/hbase/master/TestRestartCluster.java | 49 ++++++++++++++++++++++ 5 files changed, 93 insertions(+), 18 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index dc76d72..48c95db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -602,7 +602,7 @@ public class ServerManager { return false; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - master.getAssignmentManager().submitServerCrash(serverName, true); + long pid = master.getAssignmentManager().submitServerCrash(serverName, true); // Tell our listeners that a server was removed if (!this.listeners.isEmpty()) { @@ -614,7 +614,7 @@ public class ServerManager { if (flushedSeqIdFlusher != null) { flushedSeqIdFlusher.triggerNow(); } - return true; + return pid > 0; } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index a564ea9..d761cab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1342,21 +1342,37 @@ public class AssignmentManager { public long submitServerCrash(ServerName serverName, boolean shouldSplitWal) { boolean carryingMeta; - long pid; - ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); - // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the - // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from - // this server. This is used to simplify the implementation for TRSP and SCP, where we can make - // sure that, the region list fetched by SCP will not be changed any more. - serverNode.writeLock().lock(); - try { - serverNode.setState(ServerState.CRASHED); - carryingMeta = isCarryingMeta(serverName); - ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); - } finally { - serverNode.writeLock().unlock(); + long pid = -1; + ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); + carryingMeta = isCarryingMeta(serverName); + // HBASE-20976 + // Check whether there is already a SCP running for the ServerName + List previousSCPs = + procExec.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure) + .map(p -> (ServerCrashProcedure) p) + .filter(p -> p.getServerName().equals(serverName) + && (p.isFinished() == false || p.isSuccess() == false)) + .collect(Collectors.toList()); + if (previousSCPs == null || previousSCPs.isEmpty()) { + // we hold the write lock here for fencing on reportRegionStateTransition. Once we set the + // server state to CRASHED, we will no longer accept the reportRegionStateTransition call from + // this server. This is used to simplify the implementation for TRSP and SCP, where we can + // make + // sure that, the region list fetched by SCP will not be changed any more. + ServerStateNode serverNode = regionStates.getOrCreateServer(serverName); + serverNode.writeLock().lock(); + try { + serverNode.setState(ServerState.CRASHED); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + serverName, shouldSplitWal, carryingMeta)); + LOG.debug("Added=" + serverName + + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + } finally { + serverNode.writeLock().unlock(); + } + } else { + LOG.debug("Skip to add SCP for " + serverName + " with meta=" + carryingMeta + + " , since there are SCP(s) executing for it: " + previousSCPs); } LOG.info( "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index b93f8fa..285a1e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -390,4 +390,9 @@ public class ServerCrashProcedure protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics(); } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 31a7cad..bb06b68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1185,6 +1185,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @param servers number of region servers */ public void restartHBaseCluster(int servers) throws IOException, InterruptedException { + this.restartHBaseCluster(servers, null); + } + + public void restartHBaseCluster(int servers, List ports) + throws IOException, InterruptedException { if (hbaseAdmin != null) { hbaseAdmin.close(); hbaseAdmin = null; @@ -1193,7 +1198,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { this.connection.close(); this.connection = null; } - this.hbaseCluster = new MiniHBaseCluster(this.conf, servers); + this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index 4ba1876..75a6bf6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -32,13 +34,17 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -67,6 +73,49 @@ public class TestRestartCluster { } @Test + public void testClusterRestartFailOver() throws Exception { + UTIL.startMiniCluster(3); + while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { + Threads.sleep(1); + } + TableName tableName = TABLES[0]; + ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + UTIL.createMultiRegionTable(tableName, FAMILY); + UTIL.waitTableEnabled(tableName); + Table table = UTIL.getConnection().getTable(tableName); + + for (int i = 0; i < 100; i++) { + UTIL.loadTable(table, FAMILY); + } + + List ports = + UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream() + .map(serverName -> serverName.getPort()).collect(Collectors.toList()); + LOG.info("Shutting down cluster"); + UTIL.getHBaseCluster().killAll(); + + LOG.info("Sleeping a bit"); + Thread.sleep(2000); + + UTIL.getHBaseCluster().waitUntilShutDown(); + + LOG.info("Starting cluster the second time"); + UTIL.restartHBaseCluster(3, ports); + Waiter.waitFor(UTIL.getConfiguration(), 10000, + () -> UTIL.getHBaseCluster().getMaster().isInitialized()); + Thread.sleep(500); + LOG.info("start to find the procedure of SCP for the severName we choose"); + Waiter.waitFor(UTIL.getConfiguration(), 20000, + () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) procedure).getServerName().compareTo(testServer) == 0)); + LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + Thread.sleep(20000); + } + + @Test public void testClusterRestart() throws Exception { UTIL.startMiniCluster(3); while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { -- 2.7.4