From ea3fc7d587addfa5a840f4d1702e6a11f4ea568e Mon Sep 17 00:00:00 2001 From: jingyuntian Date: Wed, 24 Oct 2018 17:22:19 +0800 Subject: [PATCH] HBASE-21322 Add a scheduleServerCrashProcedure() API to HbckService --- .../org/apache/hadoop/hbase/client/HBaseHbck.java | 13 ++++ .../java/org/apache/hadoop/hbase/client/Hbck.java | 2 + .../hbase/shaded/protobuf/RequestConverter.java | 7 +++ .../src/main/protobuf/Master.proto | 12 ++++ .../hadoop/hbase/master/MasterRpcServices.java | 70 ++++++++++++++++++++++ .../hbase/master/assignment/AssignmentManager.java | 11 ++-- .../org/apache/hadoop/hbase/client/TestHbck.java | 39 ++++++++++++ 7 files changed, 149 insertions(+), 5 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java index 2d08825..76ead51 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseHbck.java @@ -154,4 +154,17 @@ public class HBaseHbck implements Hbck { }); return response.getBypassedList(); } + + @Override + public List scheduleServerCrashProcedure(List serverNames) throws IOException { + try { + MasterProtos.ScheduleServerCrashProcedureResponse response = + this.hbck.scheduleServerCrashProcedure(rpcControllerFactory.newController(), + RequestConverter.toScheduleServerCrashProcedureRequest(serverNames)); + return response.getPidList(); + } catch (ServiceException se) { + LOG.debug(toCommaDelimitedString(serverNames), se); + throw new IOException(se); + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java index 5c97d97..a309ddb 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Hbck.java @@ -101,4 +101,6 @@ public interface Hbck extends Abortable, Closeable { */ List bypassProcedure(List pids, long waitTime, boolean override, boolean recursive) throws IOException; + + List scheduleServerCrashProcedure(List serverNames) throws IOException; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 1253974..9cf5860 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -1906,6 +1906,13 @@ public final class RequestConverter { setOverride(override).build(); } + public static MasterProtos.ScheduleServerCrashProcedureRequest + toScheduleServerCrashProcedureRequest(List serverNames) { + MasterProtos.ScheduleServerCrashProcedureRequest.Builder b = + MasterProtos.ScheduleServerCrashProcedureRequest.newBuilder(); + return b.addAllServerName(serverNames).build(); + } + private static List toEncodedRegionNameRegionSpecifiers( List encodedRegionNames) { return encodedRegionNames.stream(). diff --git a/hbase-protocol-shaded/src/main/protobuf/Master.proto b/hbase-protocol-shaded/src/main/protobuf/Master.proto index 8318da3..d9ea124 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Master.proto @@ -1046,6 +1046,14 @@ message BypassProcedureResponse { repeated bool bypassed = 1; } +message ScheduleServerCrashProcedureRequest { + repeated string serverName = 1; +} + +message ScheduleServerCrashProcedureResponse { + repeated uint64 pid = 1; +} + service HbckService { /** Update state of the table in meta only*/ rpc SetTableStateInMeta(SetTableStateInMetaRequest) @@ -1072,4 +1080,8 @@ service HbckService { /** Bypass a procedure to completion, procedure is completed but no actual work is done*/ rpc BypassProcedure(BypassProcedureRequest) returns(BypassProcedureResponse); + + /** Schedule a ServerCrashProcedure to help recover a crash server */ + rpc ScheduleServerCrashProcedure(ScheduleServerCrashProcedureRequest) + returns(ScheduleServerCrashProcedureResponse); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index a6bc086..c152518 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -18,6 +18,8 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; + import java.io.IOException; import java.net.BindException; import java.net.InetAddress; @@ -31,6 +33,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; @@ -64,8 +67,10 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.locking.LockProcedure; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil.NonceProcedureRunnable; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.mob.MobUtils; import org.apache.hadoop.hbase.procedure.MasterProcedureManager; import org.apache.hadoop.hbase.procedure2.LockType; @@ -96,6 +101,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; @@ -2447,4 +2453,68 @@ public class MasterRpcServices extends RSRpcServices throw new ServiceException(e); } } + + @Override + public MasterProtos.ScheduleServerCrashProcedureResponse scheduleServerCrashProcedure( + RpcController controller, MasterProtos.ScheduleServerCrashProcedureRequest request) + throws ServiceException { + List serverNames = request.getServerNameList(); + List pids = new ArrayList<>(); + try { + for (String serverName : serverNames) { + ServerName server = parseServerName(serverName); + if (shouldSubmitSCP(server)) { + ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); + pids.add(procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + server, true, containMetaWals(server)))); + } else { + pids.add(-1L); + } + } + return MasterProtos.ScheduleServerCrashProcedureResponse.newBuilder().addAllPid(pids).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + private boolean containMetaWals(ServerName serverName) throws IOException { + Path logDir = new Path(master.getWALRootDir(), + AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + return master.getFileSystem().listStatus(splitDir, META_FILTER).length > 0; + } + + private ServerName parseServerName(String serverName) throws IOException { + String[] parts = serverName.split(","); + if (parts.length != 3) { + throw new IOException(serverName + + ": Bad format of ServerName, which should be [hostName,port,startCode]"); + } + return ServerName.valueOf(parts[0], Integer.parseInt(parts[1]), Long.parseLong(parts[2])); + } + + private boolean shouldSubmitSCP(ServerName serverName) throws IOException { + // check if there is serverName-splitting directory under the WALs directory + Path logDir = new Path(master.getWALRootDir(), + AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + if (!master.getFileSystem().exists(splitDir)) { + LOG.info("there is no splitting directory {} for this server {}, no SCP is required", + splitDir, serverName); + return false; + } + // check if there is already a SCP of this server running + List> procedures = + master.getMasterProcedureExecutor().getProcedures(); + for (Procedure procedure : procedures) { + if (procedure instanceof ServerCrashProcedure) { + if (serverName.compareTo(((ServerCrashProcedure) procedure).getServerName()) == 0) { + LOG.info("there is already a SCP of this server {} running, pid {}", serverName, + procedure.getProcId()); + return false; + } + } + } + return true; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java index 8b5bb6c..ab9b99a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java @@ -1361,13 +1361,14 @@ public class AssignmentManager implements ServerListener { return 0; } - public void submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { + public long submitServerCrash(final ServerName serverName, final boolean shouldSplitWal) { boolean carryingMeta = isCarryingMeta(serverName); ProcedureExecutor procExec = this.master.getMasterProcedureExecutor(); - procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), serverName, - shouldSplitWal, carryingMeta)); - LOG.debug("Added=" + serverName + - " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + long pid = procExec.submitProcedure(new ServerCrashProcedure(procExec.getEnvironment(), + serverName, shouldSplitWal, carryingMeta)); + LOG.debug("Added=" + serverName + + " to dead servers, submitted shutdown handler to be executed meta=" + carryingMeta); + return pid; } public void offlineRegion(final RegionInfo regionInfo) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java index 7680845..61881f1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHbck.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.client; import static junit.framework.TestCase.assertTrue; +import static org.apache.hadoop.hbase.HConstants.HREGION_LOGDIR_NAME; +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.SPLITTING_EXT; import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -26,8 +28,12 @@ import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -36,10 +42,13 @@ import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; @@ -173,6 +182,36 @@ public class TestHbck { } } + @Test + public void testScheduleSCP() throws Exception { + HRegionServer testRs = TEST_UTIL.getRSForFirstRegionInTable(TABLE_NAME); + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), Bytes.toBytes("family1"), + true); + ServerName serverName = testRs.getServerName(); + FileSystem fileSystem = TEST_UTIL.getHBaseCluster().getMaster().getFileSystem(); + Path walsDir = + new Path(CommonFSUtils.getWALRootDir(TEST_UTIL.getConfiguration()), HREGION_LOGDIR_NAME); + Hbck hbck = TEST_UTIL.getHbck(); + List pids = hbck.scheduleServerCrashProcedure(Arrays.asList(serverName.toString())); + assertTrue(pids.get(0) == -1); + LOG.info("pid is {}", pids.get(0)); + + // rename wal dir to splitting dir + Path logDir = new Path(walsDir, serverName.toString()); + fileSystem.rename(logDir, logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT)); + + FileStatus[] splittingDirs = + fileSystem.listStatus(walsDir, path -> path.getName().endsWith(SPLITTING_EXT)); + assertTrue(splittingDirs.length > 0); + pids = hbck.scheduleServerCrashProcedure(Arrays.asList(serverName.toString())); + assertTrue(pids.get(0) > 0); + LOG.info("pid is {}", pids.get(0)); + pids = hbck.scheduleServerCrashProcedure(Arrays.asList(serverName.toString())); + assertTrue(pids.get(0) == -1); + LOG.info("pid is {}", pids.get(0)); + + } + private void waitOnPids(List pids) { for (Long pid: pids) { while (!TEST_UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor(). -- 2.7.4