From 22210913a058f2ee8352fcaad12bb0223af94fd7 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Thu, 20 Dec 2018 18:18:55 +0800 Subject: [PATCH] HBASE-21588 Procedure v2 wal splitting implementation --- .../java/org/apache/hadoop/hbase/HConstants.java | 4 + .../src/main/protobuf/MasterProcedure.proto | 25 ++ .../apache/hadoop/hbase/executor/EventType.java | 9 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 5 + .../hadoop/hbase/master/MasterWalManager.java | 80 +++++- .../hadoop/hbase/master/SplitWALManager.java | 117 ++++++++ .../master/procedure/ServerCrashProcedure.java | 72 ++++- .../master/procedure/ServerProcedureInterface.java | 11 +- .../hadoop/hbase/master/procedure/ServerQueue.java | 3 + .../hbase/master/procedure/SplitWALProcedure.java | 189 +++++++++++++ .../hbase/master/procedure/SplitWALQueue.java | 35 +++ .../master/procedure/SplitWALRemoteProcedure.java | 179 +++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 16 +- .../hbase/regionserver/SplitWALCallable.java | 294 +++++++++++++++++++++ .../regionserver/WrongTargetServerException.java | 37 +++ .../hadoop/hbase/master/TestMasterWalManager.java | 258 ++++++++++++++++++ .../hadoop/hbase/master/TestSplitWALManager.java | 161 +++++++++++ .../master/procedure/TestServerCrashProcedure.java | 22 +- .../master/procedure/TestSplitWALProcedure.java | 130 +++++++++ 20 files changed, 1636 insertions(+), 14 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index fdc3d82..4662404 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1311,6 +1311,10 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated"; + + public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true; + /** Config key for if the server should send backpressure and if the client should listen to * that backpressure from the server */ public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index cc0c6ba..3021ae1 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -308,6 +308,8 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_SPLIT_META_WAL_CHECK=12; + SERVER_CRASH_SPLIT_WAL_CHECK=13; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -551,3 +553,26 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +message SplitWALParameter { + required ServerName target_server = 1; + required string wal_path = 2; +} + + +message SplitWALData{ + required string walPath = 1; + required ServerName crashed_server=2; + optional ServerName worker = 3; +} + +message SplitWALRemoteData{ + required ServerName worker = 1; + required string wal_path = 2; +} + +enum SplitWALState{ + ACQUIRE_SPLIT_WAL_WORKER = 1; + DISPATCH_WAL_TO_WORKER = 2; + RELEASE_SPLIT_WORKER = 3; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1c..4fdb005 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -288,7 +288,14 @@ public enum EventType { * * RS_REPLAY_SYNC_REPLICATION_WAL */ - RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL), + + /** + * RS split wal that under splitting dirs + * + * RS_SPLIT_WAL + */ + RS_SPLIT_WAL(86, ExecutorType.RS_SPLIT_WAL); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354..b30faec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_SPLIT_WAL(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0bcef59..3ffd5b6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; @@ -945,6 +947,9 @@ public class HMaster extends HRegionServer implements MasterServices { // loading. this.serverManager = createServerManager(this); this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); + if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.walManager.getSplitWALManager().registerSplitWorkerAssigner(); + } createProcedureExecutor(); @SuppressWarnings("rawtypes") Map, List>> procsByType = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 5ab1c28..2de3a4c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -44,6 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * This class abstracts a bunch of operations the HMaster needs @@ -60,7 +65,8 @@ public class MasterWalManager { } }; - final static PathFilter NON_META_FILTER = new PathFilter() { + @VisibleForTesting + public final static PathFilter NON_META_FILTER = new PathFilter() { @Override public boolean accept(Path p) { return !AbstractFSWALProvider.isMetaFile(p); @@ -83,6 +89,7 @@ public class MasterWalManager { // create the split log lock private final Lock splitLogLock = new ReentrantLock(); private final SplitLogManager splitLogManager; + private SplitWALManager splitWALManager = null; // Is the fileystem ok? private volatile boolean fsOk = true; @@ -99,8 +106,10 @@ public class MasterWalManager { this.rootDir = rootDir; this.services = services; this.splitLogManager = new SplitLogManager(services, conf); - this.oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME); + if(!conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)){ + splitWALManager = new SplitWALManager(services, conf); + } } public void stop() { @@ -114,6 +123,11 @@ public class MasterWalManager { return this.splitLogManager; } + + SplitWALManager getSplitWALManager() { + return splitWALManager; + } + /** * Get the directory where old logs go * @return the dir @@ -396,4 +410,66 @@ public class MasterWalManager { } + public Procedure[] splitLogs(ServerName crashedServer, boolean splitMeta) + throws IOException { + List walSplittingProcedures = null; + try { + // 1. list all splitting files + List splittingFiles = getWALsToSplit(crashedServer, splitMeta); + // 2. create corresponding procedures + walSplittingProcedures = splitWALManager.createSplitWALProcedures(splittingFiles, crashedServer); + } catch (IOException e) { + LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e); + throw e; + } + + return walSplittingProcedures == null ? new Procedure[0] + : walSplittingProcedures.toArray(new Procedure[walSplittingProcedures.size()]); + } + + public List getWALsToSplit(ServerName serverName, boolean splitMeta) + throws IOException { + List logDirs = getLogDirs(Collections.singleton(serverName)); + FileStatus[] fileStatuses = + SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); + LOG.info("size of WALs of {} is {}", serverName, fileStatuses.length); + return Lists.newArrayList(fileStatuses); + } + + private Path getWALSplitDir(ServerName serverName) { + Path logDir = + new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + } + + public void deleteSplitWal(String wal) { + try { + fs.delete(new Path(wal), false); + } catch (IOException e) { + LOG.error("remove wal {} failed, ignore...", wal); + } + } + + public void deleteWALDir(ServerName serverName) { + Path splitDir = getWALSplitDir(serverName); + try { + fs.delete(splitDir, false); + } catch (IOException e) { + LOG.warn("remove wal Dir {} failed, ignore...", splitDir); + } + + } + + public ServerName acquireSplitWALWorker(Procedure procedure) throws ProcedureSuspendedException { + return splitWALManager.acquireSplitWorker(procedure); + } + + public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + splitWALManager.releaseSplitWorker(worker, scheduler); + } + + public boolean isSplitWALFinished(String wal) throws IOException { + Path walPath = new Path(rootDir, wal); + return !fs.exists(walPath); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java new file mode 100644 index 0000000..e75efd6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -0,0 +1,117 @@ +package org.apache.hadoop.hbase.master; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Create {@link SplitWALProcedure} for each WAL which need to split. + * + * Manage the workers for each {@link SplitWALProcedure}. Total number of workers is + * (number of online servers) * (hbase.regionserver.wal.max.splitters), + * {@link SplitWorkerAssigner} helps assign and release workers for split tasks. + * + */ +@InterfaceAudience.Private +public class SplitWALManager { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); + + private final MasterServices server; + private final SplitWorkerAssigner splitWorkerAssigner; + + public SplitWALManager(MasterServices master, Configuration conf) { + this.server = master; + this.splitWorkerAssigner = + new SplitWorkerAssigner(server, conf.getInt("hbase.regionserver.wal.max.splitters", 3)); + + } + + public List createSplitWALProcedures(List splittingWALs, + ServerName crashedServer) { + return splittingWALs.stream().map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) + .collect(Collectors.toList()); + } + + public ServerName acquireSplitWorker(Procedure procedure) throws ProcedureSuspendedException { + Optional worker = splitWorkerAssigner.acquire(); + if (worker.isPresent()) { + return worker.get(); + } + splitWorkerAssigner.suspend(procedure); + throw new ProcedureSuspendedException(); + } + + public void releaseSplitWorker(ServerName worker, MasterProcedureScheduler scheduler) { + splitWorkerAssigner.release(worker); + splitWorkerAssigner.wake(scheduler); + } + + public void registerSplitWorkerAssigner() { + this.splitWorkerAssigner.registerListener(); + } + + public static final class SplitWorkerAssigner implements ServerListener { + private int maxSplitTasks; + private final ProcedureEvent event; + private Map currentWorkers = new HashMap<>(); + private MasterServices master; + + public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { + this.maxSplitTasks = maxSplitTasks; + this.master = master; + this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); + } + + public void registerListener() { + master.getServerManager().registerListener(this); + } + + public synchronized Optional acquire() { + Optional worker = master.getServerManager().getOnlineServers().keySet().stream() + .filter(serverName -> !currentWorkers.containsKey(serverName) + || currentWorkers.get(serverName) > 0) + .findAny(); + if (worker.isPresent()) { + currentWorkers.compute(worker.get(), (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + LOG.debug("acquired a worker {} to split wal", worker); + return worker; + } + + public synchronized void release(ServerName serverName) { + currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); + LOG.debug("release a worker {} to split wal", serverName); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + + @Override + public void serverAdded(ServerName serverName) { + this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 05bcd28..6cff5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -31,6 +35,7 @@ import org.apache.hadoop.hbase.master.MasterWalManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -107,6 +112,7 @@ public class ServerCrashProcedure protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) throws ProcedureSuspendedException, ProcedureYieldException { final MasterServices services = env.getMasterServices(); + final AssignmentManager am = env.getAssignmentManager(); // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -117,6 +123,7 @@ public class ServerCrashProcedure switch (state) { case SERVER_CRASH_START: case SERVER_CRASH_SPLIT_META_LOGS: + case SERVER_CRASH_SPLIT_META_WAL_CHECK: case SERVER_CRASH_ASSIGN_META: break; default: @@ -137,8 +144,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_META_LOGS: - splitMetaLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + am.getRegionStates().metaLogSplitting(serverName); + addChildProcedure(createSplittingWalProcedures(env, true)); + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_WAL_CHECK); + } + break; + case SERVER_CRASH_SPLIT_META_WAL_CHECK: + if(isSplittingDone(env, true)){ + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + am.getRegionStates().metaLogSplit(serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); + } break; case SERVER_CRASH_ASSIGN_META: assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); @@ -156,8 +179,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_LOGS: - splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + am.getRegionStates().logSplitting(this.serverName); + addChildProcedure(createSplittingWalProcedures(env, false)); + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_WAL_CHECK); + } + break; + case SERVER_CRASH_SPLIT_WAL_CHECK: + if (isSplittingDone(env, false)) { + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + am.getRegionStates().logSplit(this.serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + } break; case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. @@ -179,6 +218,7 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: + LOG.info("removed crashed server {} after splitting done", serverName); services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -191,6 +231,30 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } + private void cleanupSplitDir(MasterProcedureEnv env) { + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + mwm.deleteWALDir(serverName); + } + + private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { + LOG.info("start to check if splitting finished"); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + try { + return mwm.getWALsToSplit(serverName, splitMeta).size() == 0; + } catch (IOException e) { + LOG.error("get filelist of serverName {} failed", serverName); + return false; + } + } + + private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta) + throws IOException { + LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + Procedure[] procedures = mwm.splitLogs(serverName, splitMeta); + return procedures; + } + private boolean filterDefaultMetaRegions() { if (regionsOnCrashedServer == null) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index f3c10ef..6e3c91b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,16 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER + CRASH_HANDLER, + /** + * help find a available region server as worker and manage the split wal task flow. + */ + SPLIT_WAL, + + /** + * send the split WAL request to region server and handle the response + */ + SPLIT_WAL_REMOTE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 3a1b3c4..42a46a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -35,6 +35,9 @@ class ServerQueue extends Queue { switch (spi.getServerOperationType()) { case CRASH_HANDLER: return true; + case SPLIT_WAL: + case SPLIT_WAL_REMOTE: + return false; default: break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java new file mode 100644 index 0000000..0cf56b7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -0,0 +1,189 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The procedure is to split a WAL. It will get an available region server and + * schedule a {@link SplitWALRemoteProcedure} to actually send the request to region + * server to split this WAL. + * It also check if the split wal task really succeed. If the WAL still exists, it will + * schedule another region server to split this WAL. + */ +@InterfaceAudience.Private +public class SplitWALProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class); + private String walPath; + private ServerName worker; + private ServerName crashedServer; + private int attempts = 0; + + public SplitWALProcedure() { + } + + public SplitWALProcedure(String walPath, ServerName crashedServer) { + this.walPath = walPath; + this.crashedServer = crashedServer; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + MasterWalManager walManager = env.getMasterServices().getMasterWalManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + worker = walManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + assert worker != null; + addChildProcedure(new SplitWALRemoteProcedure(worker, walPath)); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + boolean finished; + try { + finished = walManager.isSplitWALFinished(walPath); + } catch (IOException ioe) { + long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); + LOG.warn("Failed to check whether splitting wal {} success", walPath, backoff / 1000, ioe); + throw suspend(backoff); + } + walManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + if (!finished) { + LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); + setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState splitOneWalState) + throws IOException, InterruptedException { + if (splitOneWalState == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + if (worker != null) { + builder.setWorker(ProtobufUtil.toServerName(worker)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + walPath = data.getWalPath(); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + if (data.hasWorker()) { + worker = ProtobufUtil.toServerName(data.getWorker()); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected final ProcedureSuspendedException suspend(long backoff) + throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + public String getWAL() { + return walPath; + } + + @VisibleForTesting + public ServerName getWorker(){ + return worker; + } + + @Override + public ServerName getServerName() { + return this.crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java new file mode 100644 index 0000000..a475077 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class SplitWALQueue extends Queue { + public SplitWALQueue(String name, LockStatus lockStatus) { + super(name, lockStatus); + } + + @Override + boolean requireExclusiveLock(Procedure proc) { + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java new file mode 100644 index 0000000..5017199 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -0,0 +1,179 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; +import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; +import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +/** + * A remote procedure which is used to send split WAL request to region server. + */ +@InterfaceAudience.Private +public class SplitWALRemoteProcedure extends Procedure + implements RemoteProcedureDispatcher.RemoteProcedure, + ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); + private String walPath; + private ServerName worker; + private boolean dispatched; + private ProcedureEvent event; + private boolean success = false; + + public SplitWALRemoteProcedure() { + } + + public SplitWALRemoteProcedure(ServerName serverName, String wal) { + this.worker = serverName; + this.walPath = wal; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (success) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(worker, this); + } catch (NoNodeDispatchException | NullTargetServerDispatchException + | NoServerDispatchException e) { + LOG.error("dispatch log {} to {} failed ", walPath, worker, e); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData.Builder builder = + MasterProcedureProtos.SplitWALRemoteData.newBuilder(); + builder.setWalPath(walPath) + .setWorker(ProtobufUtil.toServerName(worker)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData data = + serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); + walPath = data.getWalPath(); + worker = ProtobufUtil.toServerName(data.getWorker()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class, + MasterProcedureProtos.SplitWALParameter.newBuilder() + .setTargetServer(ProtobufUtil.toServerName(serverName)).setWalPath(walPath).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (error == null) { + LOG.info("Split of {} on {} succeeded", walPath, worker); + env.getMasterServices().getMasterWalManager().deleteSplitWal(walPath); + success = true; + } else { + if (error instanceof DoNotRetryIOException) { + LOG.error("WAL split task send to a wrong server {}, will retry on another server", + worker, error); + success = true; + } else { + LOG.error("split WAL failed, retry..."); + success = false; + } + + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + LOG.error("Remote call failed for {} on {}", walPath, worker, error); + complete(env, error); + } + + public String getWAL() { + return this.walPath; + } + + @Override + public ServerName getServerName() { + return this.worker; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL_REMOTE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 13f277b..afb3994 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -619,7 +622,10 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { - this.csm = new ZkCoordinatedStateManager(this); + if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.csm = new ZkCoordinatedStateManager(this); + } masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); @@ -1949,6 +1955,11 @@ public class HRegionServer extends HasThread implements this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } + if (!conf.getBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.executorService.startExecutorService(ExecutorType.RS_SPLIT_WAL, conf.getInt( + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + } this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); // Start the threads for compacted files discharger @@ -1993,7 +2004,8 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - if (this.csm != null) { + if (this.csm != null + && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { // SplitLogWorker needs csm. If none, don't start this. this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); splitLogWorker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java new file mode 100644 index 0000000..b3d0a0c --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.Optional; +import java.util.concurrent.locks.Lock; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@InterfaceAudience.Private +public class SplitWALCallable implements RSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class); + + private String walPath; + private Exception initError; + private Exception processError; + private HRegionServer rs; + private boolean finished = false; + private final KeyLocker splitWALLocks = new KeyLocker<>(); + private volatile Lock splitWALLock = null; + + + @Override + public void init(byte[] parameter, HRegionServer rs) { + try { + this.rs = rs; + MasterProcedureProtos.SplitWALParameter param = + MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); + if (!rs.getServerName().equals(ProtobufUtil.toServerName(param.getTargetServer()))) { + initError = new WrongTargetServerException( + "this request sent to a wrong target server " + rs.getServerName()); + } + this.walPath = param.getWalPath(); + } catch (InvalidProtocolBufferException e) { + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_SPLIT_WAL; + } + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + //grab a lock + splitWALLock = splitWALLocks.acquireLock(walPath); + try{ + //submit split wal task to executor + splitWal(); + } finally { + splitWALLock.unlock(); + } + waitForComplete(); + + if (processError != null) { + throw processError; + } + return null; + } + + public void finish(boolean success) { + if (!success) { + processError = new IOException("Split wal " + walPath + " failed at server "); + } + synchronized (this) { + finished = true; + notifyAll(); + } + } + + @VisibleForTesting + public boolean isFinished(){ + return finished; + } + + @VisibleForTesting + public boolean isLocked() { + return splitWALLock != null; + } + + public String getWalPath() { + return this.walPath; + } + + private void waitForComplete() throws InterruptedException { + while (!finished) { + LOG.debug("splitting is still ongoing for WAL {}", this.walPath); + synchronized (this) { + this.wait(2000); + } + } + LOG.info("split WAL succeed {}", this.walPath); + } + + private void splitWal() { + WALSplittingHandler walSplittingHandler = new WALSplittingHandler(rs, EventType.RS_LOG_REPLAY, + walPath -> splitWAL(walPath, rs.getConfiguration(), rs, rs, rs.walFactory), walPath, this); + rs.getExecutorService().submit(walSplittingHandler); + } + + + @FunctionalInterface + public interface SplitWALTaskExecutor { + boolean exec(String name); + } + + private static boolean splitWAL(String name, Configuration conf, RegionServerServices server, + LastSequenceId sequenceIdChecker, WALFactory factory) { + Path walDir; + FileSystem fs; + try { + walDir = FSUtils.getWALRootDir(conf); + fs = walDir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return false; + } + try { + if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) { + return true; + } + } catch (IOException e) { + LOG.warn("failed to process sync replication wal {}", name, e); + return false; + } + try { + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, + null, sequenceIdChecker, null, factory)) { + return false; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + name + " interrupted, resigning", iioe); + return false; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + // A wal file may not exist anymore. Nothing can be recovered so move on + LOG.warn("WAL {} does not exist anymore", name, e); + return true; + } + Throwable cause = e.getCause(); + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException + || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { + LOG.warn( + "WAL splitting of " + name + " can't connect to the target regionserver, resigning", + e); + return false; + } else if (cause instanceof InterruptedException) { + LOG.warn("WAL splitting of " + name + " interrupted, resigning", e); + return false; + } + LOG.warn("WAL splitting of " + name + " failed, returning error", e); + return false; + } + return true; + } + + // returns whether we need to continue the split work + private static boolean processSyncReplicationWAL(String name, Configuration conf, + RegionServerServices server, FileSystem fs, Path walDir) throws IOException { + Path walFile = new Path(walDir, name); + String filename = walFile.getName(); + Optional optSyncPeerId = + SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename); + if (!optSyncPeerId.isPresent()) { + return true; + } + String peerId = optSyncPeerId.get(); + ReplicationPeerImpl peer = + server.getReplicationSourceService().getReplicationPeers().getPeer(peerId); + if (peer == null || !peer.getPeerConfig().isSyncReplication()) { + return true; + } + Pair stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) + && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) { + // copy the file to remote and overwrite the previous one + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp"); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation") + FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true, + FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL), + remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) { + IOUtils.copy(in, out); + } + Path toCommitRemoteWAL = + new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX); + // Some FileSystem implementations may not support atomic rename so we need to do it in two + // phases + FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL); + FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename)); + } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) + && stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) + || stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) { + // check whether we still need to process this file + // actually we only write wal file which name is ended with .syncrep in A state, and after + // transiting to a state other than A, we will reopen all the regions so the data in the wal + // will be flushed so the wal file will be archived soon. But it is still possible that there + // is a server crash when we are transiting from A to S, to simplify the logic of the transit + // procedure, here we will also check the remote snapshot directory in state S, so that we do + // not need wait until all the wal files with .syncrep suffix to be archived before finishing + // the procedure. + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) { + // the file has been replayed when the remote cluster was transited from S to DA, the + // content will be replicated back to us so give up split it. + LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " + + "the content will be replicated back", + filename); + return false; + } + } + return true; + } + + private static class WALSplittingHandler extends EventHandler { + private SplitWALTaskExecutor executor; + private String wal; + private SplitWALCallable callable; + + public WALSplittingHandler(Server server, EventType eventType, SplitWALTaskExecutor executor, + String wal, SplitWALCallable callable) { + super(server, eventType); + this.executor = executor; + this.wal = wal; + this.callable = callable; + } + + @Override + public void process() throws IOException { + LOG.info("start to process wal splitting of {}", wal); + boolean success = executor.exec(wal); + callable.finish(success); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java new file mode 100644 index 0000000..d230e36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * send a request to a wrong server + */ +@InterfaceAudience.Private +public class WrongTargetServerException extends DoNotRetryIOException { + public WrongTargetServerException() { + super(); + } + + public WrongTargetServerException(String message) { + super(message); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java new file mode 100644 index 0000000..76813cd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMasterWalManager { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterWalManager.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private MasterWalManager masterWalManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + masterWalManager = master.getMasterWalManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestMasterWalManager")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testGetWALsToSplit() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + List metaWals = masterWalManager.getWALsToSplit(metaServer, true); + Assert.assertEquals(1, metaWals.size()); + List wals = masterWalManager.getWALsToSplit(metaServer, false); + Assert.assertEquals(1, wals.size()); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + metaWals = masterWalManager.getWALsToSplit(testServer, true); + Assert.assertEquals(0, metaWals.size()); + } + + @Test + public void testSplitLogs() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + Procedure[] procedures = masterWalManager.splitLogs(testServer, false); + Assert.assertEquals(1, procedures.length); + ProcedureTestingUtility.submitAndWait(masterPE, procedures[0]); + Assert.assertEquals(0, masterWalManager.getWALsToSplit(testServer, false).size()); + + procedures = masterWalManager.splitLogs(metaServer, true); + Assert.assertEquals(1, procedures.length); + ProcedureTestingUtility.submitAndWait(masterPE, procedures[0]); + Assert.assertEquals(0, masterWalManager.getWALsToSplit(metaServer, true).size()); + Assert.assertEquals(1, masterWalManager.getWALsToSplit(metaServer, false).size()); + } + + @Test + public void testAcquireAndReleaseSplitWorker() throws Exception { + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + // let one procedure finish and release worker + testProcedures.get(0).countDown(); + TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); + Assert.assertTrue(testProcedures.get(0).isSuccess()); + } + + public static final class FakeServerProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private ServerName serverName; + private ServerName worker; + private CountDownLatch barrier = new CountDownLatch(1); + private boolean triedToAcquire = false; + + public FakeServerProcedure() { + } + + public FakeServerProcedure(ServerName serverName) { + this.serverName = serverName; + } + + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return SPLIT_WAL; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + MasterWalManager walManager = env.getMasterServices().getMasterWalManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + triedToAcquire = true; + worker = walManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + barrier.await(); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + walManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + public boolean isWorkerAcquired() { + return worker != null; + } + + public boolean isTriedToAcquire() { + return triedToAcquire; + } + + public void countDown() { + this.barrier.countDown(); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws IOException, InterruptedException { + + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java new file mode 100644 index 0000000..9271a42 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ MasterTests.class, MediumTests.class }) + +public class TestSplitWALManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALManager.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private SplitWALManager splitWALManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + splitWALManager = master.getMasterWalManager().getSplitWALManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAcquireAndReleaseSplitWorker() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new TestMasterWalManager.FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + splitWALManager.releaseSplitWorker(server, TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(3))); + } + + @Test + public void testAddNewServer() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new TestMasterWalManager.FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); + newServer.waitForServerOnline(); + Assert.assertNotNull(splitWALManager.acquireSplitWorker(testProcedures.get(3))); + } + + @Test + public void testCreateSplitWALProcedures() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); + // Test splitting meta wal + FileStatus[] wals = + TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); + Assert.assertEquals(1, wals.length); + List testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + + // Test splitting wal + wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); + Assert.assertEquals(1, wals.length); + testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index af2076e..9e34887 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -44,9 +46,13 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({MasterTests.class, LargeTests.class}) public class TestServerCrashProcedure { @@ -58,6 +64,9 @@ public class TestServerCrashProcedure { protected HBaseTestingUtility util; + @Parameter + public boolean splitWALCoordinatedByZK; + private ProcedureMetrics serverCrashProcMetrics; private long serverCrashSubmittedCount = 0; private long serverCrashFailedCount = 0; @@ -67,6 +76,8 @@ public class TestServerCrashProcedure { conf.set("hbase.balancer.tablesOnMaster", "none"); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3); + LOG.info("coordinated by zk? {}", splitWALCoordinatedByZK); + conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK); } @Before @@ -75,7 +86,7 @@ public class TestServerCrashProcedure { setupConf(this.util.getConfiguration()); startMiniCluster(); ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( - this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); serverCrashProcMetrics = this.util.getHBaseCluster().getMaster().getMasterMetrics() .getServerCrashProcMetrics(); } @@ -87,10 +98,10 @@ public class TestServerCrashProcedure { @After public void tearDown() throws Exception { MiniHBaseCluster cluster = this.util.getHBaseCluster(); - HMaster master = cluster == null? null: cluster.getMaster(); + HMaster master = cluster == null ? null : cluster.getMaster(); if (master != null && master.getMasterProcedureExecutor() != null) { ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( - master.getMasterProcedureExecutor(), false); + master.getMasterProcedureExecutor(), false); } this.util.shutdownMiniCluster(); } @@ -222,4 +233,9 @@ public class TestServerCrashProcedure { serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount(); serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount(); } + + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java new file mode 100644 index 0000000..c9a4c45 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSplitWALProcedure { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALProcedure.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALProcedure.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private MasterWalManager walManager; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + walManager = master.getMasterWalManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALProcedure")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHandleDeadWorker() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = walManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString(), testServer.getServerName()); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + long pid = ProcedureTestingUtility.submitProcedure(masterPE, splitWALProcedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + TEST_UTIL.getHBaseCluster().killRegionServer(splitWALProcedure.getWorker()); + ProcedureTestingUtility.waitProcedure(masterPE, pid); + Assert.assertTrue(splitWALProcedure.isSuccess()); + } + + @Test + public void testMasterRestart() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = walManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString(), testServer.getServerName()); + long pid = ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), + splitWALProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + // Kill master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); + // restart master + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + this.master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), pid); + Optional> procedure = + master.getProcedures().stream().filter(p -> p.getProcId() == pid).findAny(); + // make sure procedure is successful and wal is deleted + Assert.assertTrue(procedure.isPresent()); + Assert.assertTrue(procedure.get().isSuccess()); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals.get(0).getPath())); + } + +} -- 2.7.4