From 20cab3dcb3adbeac07f4895d9a1f9a5e64c0ca29 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Wed, 12 Dec 2018 15:11:01 +0800 Subject: [PATCH] HBASE-21565 Delete dead server from dead server list too early leads to concurrent Server Crash Procedures(SCP) for a same server --- .../apache/hadoop/hbase/master/ServerManager.java | 25 ++++++++------- .../hbase/master/assignment/AssignmentManager.java | 28 ++++++++++++----- .../master/procedure/ServerCrashProcedure.java | 16 +++------- .../apache/hadoop/hbase/HBaseTestingUtility.java | 7 ++++- .../hadoop/hbase/master/TestRestartCluster.java | 36 ++++++++++++++++++++++ 5 files changed, 82 insertions(+), 30 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index dc76d72..86d72d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -602,19 +602,22 @@ public class ServerManager { return false; } LOG.info("Processing expiration of " + serverName + " on " + this.master.getServerName()); - master.getAssignmentManager().submitServerCrash(serverName, true); - - // Tell our listeners that a server was removed - if (!this.listeners.isEmpty()) { - for (ServerListener listener : this.listeners) { - listener.serverRemoved(serverName); + long pid = master.getAssignmentManager().submitServerCrash(serverName, true); + if(pid <= 0) { + return false; + } else { + // Tell our listeners that a server was removed + if (!this.listeners.isEmpty()) { + for (ServerListener listener : this.listeners) { + listener.serverRemoved(serverName); + } } + // trigger a persist of flushedSeqId + if (flushedSeqIdFlusher != null) { + flushedSeqIdFlusher.triggerNow(); + } + return true; } - // trigger a persist of flushedSeqId - if (flushedSeqIdFlusher != null) { - flushedSeqIdFlusher.triggerNow(); - } - return true; } @VisibleForTesting diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index a564ea9..9801693 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1350,17 +1350,31 @@ public class AssignmentManager { // sure that, the region list fetched by SCP will not be changed any more. serverNode.writeLock().lock(); try { - serverNode.setState(ServerState.CRASHED); - carryingMeta = isCarryingMeta(serverName); ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); + carryingMeta = isCarryingMeta(serverName); + // HBASE-20976 + // Check whether there is already a SCP running for the ServerName + List previousSCPs = + procExec.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure) + .map(p -> (ServerCrashProcedure) p) + .filter(p -> p.getServerName().equals(serverName) + && (p.isFinished() == false || p.isSuccess() == false)) + .collect(Collectors.toList()); + if (previousSCPs != null && previousSCPs.size() > 0) { + LOG.debug("Skip to add SCP for " + serverName + " with meta=" + carryingMeta + + " , since there are SCP(s) executing for it: " + previousSCPs); + return -1; + } else { + serverNode.setState(ServerState.CRASHED); + pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + serverName, shouldSplitWal, carryingMeta)); + LOG.info( + "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", + serverName, carryingMeta, pid); + } } finally { serverNode.writeLock().unlock(); } - LOG.info( - "Added {} to dead servers which carryingMeta={}, submitted ServerCrashProcedure pid={}", - serverName, carryingMeta, pid); return pid; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index b93f8fa..05bcd28 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -333,17 +333,6 @@ public class ServerCrashProcedure return ServerOperationType.CRASH_HANDLER; } - /** - * For this procedure, yield at end of each successful flow step so that all crashed servers - * can make progress rather than do the default which has each procedure running to completion - * before we move to the next. For crashed servers, especially if running with distributed log - * replay, we will want all servers to come along; we do not want the scenario where a server is - * stuck waiting for regions to online so it can replay edits. - */ - @Override - protected boolean isYieldBeforeExecuteFromState(MasterProcedureEnv env, ServerCrashState state) { - return true; - } @Override protected boolean shouldWaitClientAck(MasterProcedureEnv env) { @@ -390,4 +379,9 @@ public class ServerCrashProcedure protected ProcedureMetrics getProcedureMetrics(MasterProcedureEnv env) { return env.getMasterServices().getMasterMetrics().getServerCrashProcMetrics(); } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 31a7cad..bb06b68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1185,6 +1185,11 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { * @param servers number of region servers */ public void restartHBaseCluster(int servers) throws IOException, InterruptedException { + this.restartHBaseCluster(servers, null); + } + + public void restartHBaseCluster(int servers, List ports) + throws IOException, InterruptedException { if (hbaseAdmin != null) { hbaseAdmin.close(); hbaseAdmin = null; @@ -1193,7 +1198,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility { this.connection.close(); this.connection = null; } - this.hbaseCluster = new MiniHBaseCluster(this.conf, servers); + this.hbaseCluster = new MiniHBaseCluster(this.conf, 1, servers, ports, null, null); // Don't leave here till we've done a successful scan of the hbase:meta Connection conn = ConnectionFactory.createConnection(this.conf); Table t = conn.getTable(TableName.META_TABLE_NAME); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index 4ba1876..afbac02 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; @@ -33,12 +35,15 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; +import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -67,6 +72,37 @@ public class TestRestartCluster { } @Test + public void testClusterRestartFailOver() throws Exception { + UTIL.startMiniCluster(3); + UTIL.waitFor(60000, () -> UTIL.getMiniHBaseCluster().getMaster().isInitialized()); + TableName tableName = TABLES[0]; + ServerName testServer = UTIL.getHBaseCluster().getRegionServer(0).getServerName(); + UTIL.createMultiRegionTable(tableName, FAMILY); + UTIL.waitTableEnabled(tableName); + Table table = UTIL.getConnection().getTable(tableName); + for (int i = 0; i < 100; i++) { + UTIL.loadTable(table, FAMILY); + } + List ports = + UTIL.getHBaseCluster().getMaster().getServerManager().getOnlineServersList().stream() + .map(serverName -> serverName.getPort()).collect(Collectors.toList()); + LOG.info("Shutting down cluster"); + UTIL.getHBaseCluster().killAll(); + UTIL.getHBaseCluster().waitUntilShutDown(); + LOG.info("Starting cluster the second time"); + UTIL.restartHBaseCluster(3, ports); + UTIL.waitFor(10000, () -> UTIL.getHBaseCluster().getMaster().isInitialized()); + LOG.info("start to find the procedure of SCP for the severName we choose"); + UTIL.waitFor(20000, + () -> UTIL.getHBaseCluster().getMaster().getProcedures().stream() + .anyMatch(procedure -> (procedure instanceof ServerCrashProcedure) + && ((ServerCrashProcedure) procedure).getServerName().equals(testServer))); + LOG.info("start to submit the SCP for the same serverName {} which should fail", testServer); + Assert.assertFalse( + UTIL.getHBaseCluster().getMaster().getServerManager().expireServer(testServer)); + } + + @Test public void testClusterRestart() throws Exception { UTIL.startMiniCluster(3); while (!UTIL.getMiniHBaseCluster().getMaster().isInitialized()) { -- 2.7.4