From 8324bd97414885b8c1a7e3503a10ba6a33268543 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Thu, 3 Jan 2019 11:28:49 +0800 Subject: [PATCH] HBASE-21588 Procedure v2 wal splitting implementation --- .../org/apache/hadoop/hbase/HConstants.java | 8 + .../src/main/protobuf/MasterProcedure.proto | 25 ++ .../SplitLogWorkerCoordination.java | 3 - .../ZkSplitLogWorkerCoordination.java | 6 +- .../apache/hadoop/hbase/master/HMaster.java | 18 + .../hadoop/hbase/master/MasterServices.java | 7 + .../hadoop/hbase/master/MasterWalManager.java | 6 +- .../hadoop/hbase/master/SplitWALManager.java | 239 +++++++++++ .../procedure/ServerCrashProcedure.java | 76 +++- .../procedure/ServerProcedureInterface.java | 13 +- .../hbase/master/procedure/ServerQueue.java | 3 + .../master/procedure/SplitWALProcedure.java | 199 +++++++++ .../procedure/SplitWALRemoteProcedure.java | 195 +++++++++ .../hbase/regionserver/HRegionServer.java | 16 +- .../hbase/regionserver/SplitLogWorker.java | 10 +- .../hbase/regionserver/SplitWALCallable.java | 109 +++++ .../hadoop/hbase/master/AbstractTestDLS.java | 3 +- .../hbase/master/TestRestartCluster.java | 21 + .../hbase/master/TestRollingRestart.java | 18 +- .../hbase/master/TestSplitWALManager.java | 383 ++++++++++++++++++ .../procedure/TestServerCrashProcedure.java | 19 +- .../procedure/TestSplitWALProcedure.java | 127 ++++++ .../regionserver/TestSplitLogWorker.java | 5 +- 23 files changed, 1484 insertions(+), 25 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index fdc3d82677..75ee687d7f 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1311,6 +1311,14 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated"; + + public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true; + + public static final String HBASE_SPLIT_WAL_MAX_SPLITTER = "hbase.regionserver.wal.max.splitters"; + + public static final int DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER = 2; + /** Config key for if the server should send backpressure and if the client should listen to * that backpressure from the server */ public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index cc0c6ba347..f64852e383 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -308,6 +308,8 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR=12; + SERVER_CRASH_DELETE_SPLIT_WALS_DIR=13; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -551,3 +553,26 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +message SplitWALParameter { + required string wal_path = 1; +} + + +message SplitWALData{ + required string wal_path = 1; + required ServerName crashed_server=2; + optional ServerName worker = 3; +} + +message SplitWALRemoteData{ + required string wal_path = 1; + required ServerName crashed_server=2; + required ServerName worker = 3; +} + +enum SplitWALState{ + ACQUIRE_SPLIT_WAL_WORKER = 1; + DISPATCH_WAL_TO_WORKER = 2; + RELEASE_SPLIT_WORKER = 3; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index ab04f6016e..ad74015490 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -48,9 +48,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public interface SplitLogWorkerCoordination { -/* SplitLogWorker part */ - int DEFAULT_MAX_SPLITTERS = 2; - /** * Initialize internal values. This method should be used when corresponding SplitLogWorker * instance is created diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index ff555f22ad..7ceaaec362 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -19,6 +19,9 @@ package org.apache.hadoop.hbase.coordination; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; + import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -135,7 +138,8 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements this.server = server; this.worker = worker; this.splitTaskExecutor = splitExecutor; - maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + maxConcurrentTasks = + conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER); reportPeriod = conf.getInt("hbase.splitlog.report.period", conf.getInt(HConstants.HBASE_SPLITLOG_MANAGER_TIMEOUT, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0bcef59eb2..8d47db440f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; @@ -338,6 +340,13 @@ public class HMaster extends HRegionServer implements MasterServices { private MasterFileSystem fileSystemManager; private MasterWalManager walManager; + // manager to manage procedure-based WAL splitting, can be null if current + // is zk-based WAL splitting. SplitWALManager will replace SplitLogManager + // and MasterWalManager, which means zk-based WAL splitting code will be + // useless after we switch to the procedure-based one. our eventual goal + // is to remove all the zk-based WAL splitting code. + private SplitWALManager splitWALManager; + // server manager to deal with region server info private volatile ServerManager serverManager; @@ -945,6 +954,10 @@ public class HMaster extends HRegionServer implements MasterServices { // loading. this.serverManager = createServerManager(this); this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); + if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.splitWALManager = new SplitWALManager(this); + } createProcedureExecutor(); @SuppressWarnings("rawtypes") Map, List>> procsByType = @@ -1400,6 +1413,11 @@ public class HMaster extends HRegionServer implements MasterServices { return this.walManager; } + @Override + public SplitWALManager getSplitWALManager() { + return splitWALManager; + } + @Override public TableStateManager getTableStateManager() { return tableStateManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 27ef5f8cfc..12c78ac8f6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -510,4 +510,11 @@ public interface MasterServices extends Server { * @return True if cluster is up; false if cluster is not up (we are shutting down). */ boolean isClusterUp(); + + /** + * @return return null if current is zk-based WAL splitting + */ + default SplitWALManager getSplitWALManager(){ + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 5ab1c28d49..fbf4594168 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -60,7 +60,8 @@ public class MasterWalManager { } }; - final static PathFilter NON_META_FILTER = new PathFilter() { + @VisibleForTesting + public final static PathFilter NON_META_FILTER = new PathFilter() { @Override public boolean accept(Path p) { return !AbstractFSWALProvider.isMetaFile(p); @@ -167,7 +168,6 @@ public class MasterWalManager { /** * @return listing of ServerNames found by parsing WAL directory paths in FS. - * */ public Set getServerNamesFromWALDirPath(final PathFilter filter) throws IOException { FileStatus[] walDirForServerNames = getWALDirPaths(filter); @@ -290,7 +290,7 @@ public class MasterWalManager { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", justification= "We only release this lock when we set it. Updates to code that uses it should verify use " + "of the guard boolean.") - private List getLogDirs(final Set serverNames) throws IOException { + List getLogDirs(final Set serverNames) throws IOException { List logDirs = new ArrayList<>(); boolean needReleaseLock = false; if (!this.services.isInitialized()) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java new file mode 100644 index 0000000000..fc50840f49 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -0,0 +1,239 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.MasterWalManager.META_FILTER; +import static org.apache.hadoop.hbase.master.MasterWalManager.NON_META_FILTER; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Create {@link SplitWALProcedure} for each WAL which need to split. Manage the workers for each + * {@link SplitWALProcedure}. + * Total number of workers is (number of online servers) * (HBASE_SPLIT_WAL_MAX_SPLITTER). + * Helps assign and release workers for split tasks. + * Provide helper method to delete split WAL file and directory. + * + * The user can get the SplitWALProcedures via splitWALs(crashedServer, splitMeta) + * can get the files that need to split via getWALsToSplit(crashedServer, splitMeta) + * can delete the splitting WAL and directory via deleteSplitWAL(wal) + * and deleteSplitWAL(crashedServer) + * can check if splitting WALs of a crashed server is success via isSplitWALFinished(walPath) + * can acquire and release a worker for splitting WAL via acquireSplitWALWorker(procedure) + * and releaseSplitWALWorker(worker, scheduler) + * + * This class is to replace the zk-based WAL splitting related code, {@link MasterWalManager}, + * {@link SplitLogManager}, {@link org.apache.hadoop.hbase.zookeeper.ZKSplitLog} and + * {@link org.apache.hadoop.hbase.coordination.ZKSplitLogManagerCoordination} can be removed + * after we switch to procedure-based WAL splitting. + */ +@InterfaceAudience.Private +public class SplitWALManager { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALManager.class); + + private final MasterServices master; + private final SplitWorkerAssigner splitWorkerAssigner; + private final Path rootDir; + private final FileSystem fs; + private final Configuration conf; + + public SplitWALManager(MasterServices master) { + this.master = master; + this.conf = master.getConfiguration(); + this.splitWorkerAssigner = new SplitWorkerAssigner(this.master, + conf.getInt(HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); + this.rootDir = master.getMasterFileSystem().getWALRootDir(); + this.fs = master.getMasterFileSystem().getFileSystem(); + + } + + public List splitWALs(ServerName crashedServer, boolean splitMeta) + throws IOException { + try { + // 1. list all splitting files + List splittingFiles = getWALsToSplit(crashedServer, splitMeta); + // 2. create corresponding procedures + return createSplitWALProcedures(splittingFiles, crashedServer); + } catch (IOException e) { + LOG.error("failed to create procedures for splitting logs of {}", crashedServer, e); + throw e; + } + } + + public List getWALsToSplit(ServerName serverName, boolean splitMeta) + throws IOException { + List logDirs = master.getMasterWalManager().getLogDirs(Collections.singleton(serverName)); + FileStatus[] fileStatuses = + SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); + LOG.info("size of WALs of {} is {}, isMeta: {}", serverName, fileStatuses.length, splitMeta); + return Lists.newArrayList(fileStatuses); + } + + private Path getWALSplitDir(ServerName serverName) { + Path logDir = + new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + } + + public void deleteSplitWAL(String wal) throws IOException { + fs.delete(new Path(wal), false); + } + + public void deleteWALDir(ServerName serverName) throws IOException { + Path splitDir = getWALSplitDir(serverName); + fs.delete(splitDir, false); + } + + public boolean isSplitWALFinished(String walPath) throws IOException { + return !fs.exists(new Path(rootDir, walPath)); + } + + @VisibleForTesting + List createSplitWALProcedures(List splittingWALs, + ServerName crashedServer) { + return splittingWALs.stream() + .map(wal -> new SplitWALProcedure(wal.getPath().toString(), crashedServer)) + .collect(Collectors.toList()); + } + + /** + * try to acquire an worker from online servers which is executring + * @param procedure split WAL task + * @return an available region server which could execute this task + * @throws ProcedureSuspendedException if there is no available worker, + * it will throw this exception to let the procedure wait + */ + public ServerName acquireSplitWALWorker(Procedure procedure) + throws ProcedureSuspendedException { + Optional worker = splitWorkerAssigner.acquire(); + LOG.debug("acquired a worker {} to split a WAL", worker); + if (worker.isPresent()) { + return worker.get(); + } + splitWorkerAssigner.suspend(procedure); + throw new ProcedureSuspendedException(); + } + + /** + * After the worker finished the split WAL task, it will release the worker, and wake up all the + * suspend procedures in the ProcedureEvent + * @param worker worker which is about to release + * @param scheduler scheduler which is to wake up the procedure event + */ + public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + LOG.debug("release a worker {} to split a WAL", worker); + splitWorkerAssigner.release(worker); + splitWorkerAssigner.wake(scheduler); + } + + /** + * When master restart, there will be a new splitWorkerAssigner. But if there are splitting WAL + * tasks running on the region server side, they will not be count by the new splitWorkerAssigner. + * Thus we should add the workers of running tasks to the assigner when we load the procedures + * from MasterProcWALs. + * @param worker region server which is executing a split WAL task + */ + public void addUsedSplitWALWorker(ServerName worker){ + splitWorkerAssigner.addUsedWorker(worker); + } + + /** + * help assign and release a worker for each WAL splitting task + * For each worker, concurrent running splitting task should be no more than maxSplitTasks + * If a task failed to acquire a worker, it will suspend and wait for workers available + * + */ + private static final class SplitWorkerAssigner implements ServerListener { + private int maxSplitTasks; + private final ProcedureEvent event; + private Map currentWorkers = new HashMap<>(); + private MasterServices master; + + public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { + this.maxSplitTasks = maxSplitTasks; + this.master = master; + this.event = new ProcedureEvent<>("split-WAL-worker-assigning"); + this.master.getServerManager().registerListener(this); + } + + public synchronized Optional acquire() { + List serverList = master.getServerManager().getOnlineServersList(); + Collections.shuffle(serverList); + Optional worker = serverList.stream().filter( + serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) + .findAny(); + if (worker.isPresent()) { + currentWorkers.compute(worker.get(), (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + return worker; + } + + public synchronized void release(ServerName serverName) { + currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + + @Override + public void serverAdded(ServerName worker) { + this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } + + public synchronized void addUsedWorker(ServerName worker) { + // load used worker when master restart + currentWorkers.compute(worker, (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 05bcd28d1e..20727279c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -28,9 +31,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.master.SplitWALManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -107,6 +112,7 @@ public class ServerCrashProcedure protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) throws ProcedureSuspendedException, ProcedureYieldException { final MasterServices services = env.getMasterServices(); + final AssignmentManager am = env.getAssignmentManager(); // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -117,6 +123,7 @@ public class ServerCrashProcedure switch (state) { case SERVER_CRASH_START: case SERVER_CRASH_SPLIT_META_LOGS: + case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR: case SERVER_CRASH_ASSIGN_META: break; default: @@ -137,8 +144,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_META_LOGS: - splitMetaLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + am.getRegionStates().metaLogSplitting(serverName); + addChildProcedure(createSplittingWalProcedures(env, true)); + setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR); + } + break; + case SERVER_CRASH_DELETE_SPLIT_META_WALS_DIR: + if(isSplittingDone(env, true)){ + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + am.getRegionStates().metaLogSplit(serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); + } break; case SERVER_CRASH_ASSIGN_META: assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); @@ -156,8 +179,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_LOGS: - splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + am.getRegionStates().logSplitting(this.serverName); + addChildProcedure(createSplittingWalProcedures(env, false)); + setNextState(ServerCrashState.SERVER_CRASH_DELETE_SPLIT_WALS_DIR); + } + break; + case SERVER_CRASH_DELETE_SPLIT_WALS_DIR: + if (isSplittingDone(env, false)) { + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + am.getRegionStates().logSplit(this.serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + } break; case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. @@ -179,6 +218,7 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: + LOG.info("removed crashed server {} after splitting done", serverName); services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -191,6 +231,34 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } + private void cleanupSplitDir(MasterProcedureEnv env) { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + try { + splitWALManager.deleteWALDir(serverName); + } catch (IOException e) { + LOG.warn("remove WAL directory of server {} failed, ignore...", serverName, e); + } + } + + private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { + LOG.debug("check if splitting WALs of {} done? isMeta: {}", serverName, splitMeta); + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + try { + return splitWALManager.getWALsToSplit(serverName, splitMeta).size() == 0; + } catch (IOException e) { + LOG.warn("get filelist of serverName {} failed, retry...", serverName, e); + return false; + } + } + + private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta) + throws IOException { + LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta); + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + List procedures = splitWALManager.splitWALs(serverName, splitMeta); + return procedures.toArray(new Procedure[procedures.size()]); + } + private boolean filterDefaultMetaRegions() { if (regionsOnCrashedServer == null) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index f3c10efa42..b7095d84e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,18 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER + CRASH_HANDLER, + /** + * help find a available region server as worker and release worker after task done + * invoke SPLIT_WAL_REMOTE operation to send real WAL splitting request to worker + * manage the split wal task flow, will retry if SPLIT_WAL_REMOTE failed + */ + SPLIT_WAL, + + /** + * send the split WAL request to region server and handle the response + */ + SPLIT_WAL_REMOTE } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 3a1b3c4cd6..42a46a8284 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -35,6 +35,9 @@ class ServerQueue extends Queue { switch (spi.getServerOperationType()) { case CRASH_HANDLER: return true; + case SPLIT_WAL: + case SPLIT_WAL_REMOTE: + return false; default: break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java new file mode 100644 index 0000000000..3b2d0d504e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -0,0 +1,199 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.SplitWALManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The procedure is to split a WAL. It will get an available region server and + * schedule a {@link SplitWALRemoteProcedure} to actually send the request to region + * server to split this WAL. + * It also check if the split wal task really succeed. If the WAL still exists, it will + * schedule another region server to split this WAL. + */ +@InterfaceAudience.Private +public class SplitWALProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class); + private String walPath; + private ServerName worker; + private ServerName crashedServer; + private int attempts = 0; + + public SplitWALProcedure() { + } + + public SplitWALProcedure(String walPath, ServerName crashedServer) { + this.walPath = walPath; + this.crashedServer = crashedServer; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + worker = splitWALManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + assert worker != null; + addChildProcedure(new SplitWALRemoteProcedure(worker, crashedServer, walPath)); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + boolean finished; + try { + finished = splitWALManager.isSplitWALFinished(walPath); + } catch (IOException ioe) { + long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); + LOG.warn( + "Failed to check whether splitting wal {} success, wait {} seconds to retry", + walPath, backoff / 1000, ioe); + throw suspend(backoff); + } + splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + if (!finished) { + LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); + setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState splitOneWalState) + throws IOException, InterruptedException { + if (splitOneWalState == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath(walPath).setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + if (worker != null) { + builder.setWorker(ProtobufUtil.toServerName(worker)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + walPath = data.getWalPath(); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + if (data.hasWorker()) { + worker = ProtobufUtil.toServerName(data.getWorker()); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected final ProcedureSuspendedException suspend(long backoff) + throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + public String getWAL() { + return walPath; + } + + @VisibleForTesting + public ServerName getWorker(){ + return worker; + } + + @Override + public ServerName getServerName() { + return this.crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL; + } + + @Override + protected void afterReplay(MasterProcedureEnv env){ + if(worker != null){ + env.getMasterServices().getSplitWALManager().addUsedSplitWALWorker(worker); + } + + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java new file mode 100644 index 0000000000..fb2dbd7926 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; +import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; +import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +/** + * A remote procedure which is used to send split WAL request to region server. + * it will return null if the task is succeed or return a DoNotRetryIOException + * {@link SplitWALProcedure} will help handle the situation that encounter + * DoNotRetryIOException. Otherwise it will retry until succeed. + */ +@InterfaceAudience.Private +public class SplitWALRemoteProcedure extends Procedure + implements RemoteProcedureDispatcher.RemoteProcedure, + ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); + private String walPath; + private ServerName worker; + private ServerName crashedServer; + private boolean dispatched; + private ProcedureEvent event; + private boolean success = false; + + public SplitWALRemoteProcedure() { + } + + public SplitWALRemoteProcedure(ServerName worker, ServerName crashedServer, String wal) { + this.worker = worker; + this.crashedServer = crashedServer; + this.walPath = wal; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (success) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(worker, this); + } catch (NoNodeDispatchException | NullTargetServerDispatchException + | NoServerDispatchException e) { + // When send to a wrong target server, it need construct a new SplitWALRemoteProcedure. + // Thus return null for this procedure and let SplitWALProcedure to handle this. + LOG.warn("dispatch WAL {} to {} failed, will retry on another server", walPath, worker, e); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData.Builder builder = + MasterProcedureProtos.SplitWALRemoteData.newBuilder(); + builder.setWalPath(walPath).setWorker(ProtobufUtil.toServerName(worker)) + .setCrashedServer(ProtobufUtil.toServerName(crashedServer)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData data = + serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); + walPath = data.getWalPath(); + worker = ProtobufUtil.toServerName(data.getWorker()); + crashedServer = ProtobufUtil.toServerName(data.getCrashedServer()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class, + MasterProcedureProtos.SplitWALParameter.newBuilder().setWalPath(walPath).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (event == null) { + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery", + getProcId()); + return; + } + if (error == null) { + LOG.info("split WAL {} on {} succeeded", walPath, worker); + try { + env.getMasterServices().getSplitWALManager().deleteSplitWAL(walPath); + } catch (IOException e){ + LOG.warn("remove WAL {} failed, ignore...", walPath, e); + } + success = true; + } else { + if (error instanceof DoNotRetryIOException) { + LOG.warn("WAL split task of {} send to a wrong server {}, will retry on another server", + walPath, worker, error); + success = true; + } else { + LOG.warn("split WAL {} failed, retry...", walPath, error); + success = false; + } + + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + complete(env, error); + } + + public String getWAL() { + return this.walPath; + } + + @Override + public ServerName getServerName() { + // return the crashed server is to use the queue of root ServerCrashProcedure + return this.crashedServer; + } + + @Override + public boolean hasMetaTableRegion() { + return AbstractFSWALProvider.isMetaFile(new Path(walPath)); + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.SPLIT_WAL_REMOTE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 13f277b23b..f2ee11cf2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,6 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -88,7 +93,6 @@ import org.apache.hadoop.hbase.client.locking.EntityLock; import org.apache.hadoop.hbase.client.locking.LockServiceClient; import org.apache.hadoop.hbase.conf.ConfigurationManager; import org.apache.hadoop.hbase.conf.ConfigurationObserver; -import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.RegionMovedException; @@ -619,7 +623,10 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { - this.csm = new ZkCoordinatedStateManager(this); + if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.csm = new ZkCoordinatedStateManager(this); + } masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); @@ -1950,7 +1957,7 @@ public class HRegionServer extends HasThread implements conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( - "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); + HBASE_SPLIT_WAL_MAX_SPLITTER, DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER)); // Start the threads for compacted files discharger this.executorService.startExecutorService(ExecutorType.RS_COMPACTED_FILES_DISCHARGER, conf.getInt(CompactionConfiguration.HBASE_HFILE_COMPACTION_DISCHARGER_THREAD_COUNT, 10)); @@ -1993,7 +2000,8 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - if (this.csm != null) { + if (this.csm != null && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { // SplitLogWorker needs csm. If none, don't start this. this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); splitLogWorker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 4a9712cfba..13804ed4f8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -152,7 +152,7 @@ public class SplitLogWorker implements Runnable { return true; } - private static Status splitLog(String name, CancelableProgressable p, Configuration conf, + static Status splitLog(String name, CancelableProgressable p, Configuration conf, RegionServerServices server, LastSequenceId sequenceIdChecker, WALFactory factory) { Path walDir; FileSystem fs; @@ -175,9 +175,11 @@ public class SplitLogWorker implements Runnable { // interrupted or has encountered a transient error and when it has // encountered a bad non-retry-able persistent error. try { - if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, - p, sequenceIdChecker, server.getCoordinatedStateManager().getSplitLogWorkerCoordination(), - factory)) { + SplitLogWorkerCoordination splitLogWorkerCoordination = + server.getCoordinatedStateManager() == null ? null + : server.getCoordinatedStateManager().getSplitLogWorkerCoordination(); + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, p, + sequenceIdChecker, splitLogWorkerCoordination, factory)) { return Status.PREEMPTED; } } catch (InterruptedIOException iioe) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java new file mode 100644 index 0000000000..b94df22c82 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -0,0 +1,109 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.locks.Lock; + +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +/** + * This callable is used to do the real split WAL task. It is called by + * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} from master and executed + * by executor service which is in charge of executing the events of EventType.RS_LOG_REPLAY + * + * When execute this callable, it will call SplitLogWorker.splitLog() to split the WAL. + * If the return value is SplitLogWorker.TaskExecutor.Status.DONE, it means the task is successful + * and it will return null to end the call. Otherwise it will throw an exception and let + * {@link org.apache.hadoop.hbase.master.procedure.SplitWALRemoteProcedure} to handle this problem. + * + * This class is to replace the zk-based WAL splitting related code, {@link SplitLogWorker}, + * {@link org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination} and + * {@link org.apache.hadoop.hbase.coordination.ZkSplitLogWorkerCoordination} can be removed after + * we switch to procedure-based WAL splitting. + */ +@InterfaceAudience.Private +public class SplitWALCallable implements RSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class); + + private String walPath; + private Exception initError; + private HRegionServer rs; + private final KeyLocker splitWALLocks = new KeyLocker<>(); + private volatile Lock splitWALLock = null; + + + @Override + public void init(byte[] parameter, HRegionServer rs) { + try { + this.rs = rs; + MasterProcedureProtos.SplitWALParameter param = + MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); + this.walPath = param.getWalPath(); + } catch (InvalidProtocolBufferException e) { + LOG.error("parse proto buffer of split WAL request failed ", e); + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_REPLAY; + } + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + //grab a lock + splitWALLock = splitWALLocks.acquireLock(walPath); + try{ + splitWal(); + LOG.info("split WAL {} succeed.", walPath); + } catch (IOException e){ + LOG.warn("failed to split WAL {}.", walPath, e); + throw e; + } + finally { + splitWALLock.unlock(); + } + return null; + } + + public String getWalPath() { + return this.walPath; + } + + private void splitWal() throws IOException { + SplitLogWorker.TaskExecutor.Status status = + SplitLogWorker.splitLog(walPath, null, rs.getConfiguration(), rs, rs, rs.walFactory); + if (status != SplitLogWorker.TaskExecutor.Status.DONE) { + throw new IOException("Split WAL " + walPath + " failed at server "); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 388c53dc05..d72e756a0f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.apache.hadoop.hbase.SplitLogCounters.tot_mgr_wait_for_zk_delete; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_final_transition_failed; import static org.apache.hadoop.hbase.SplitLogCounters.tot_wkr_preempt_task; @@ -143,7 +144,7 @@ public abstract class AbstractTestDLS { conf.setInt("zookeeper.recovery.retry", 0); conf.setInt(HConstants.REGIONSERVER_INFO_PORT, -1); conf.setFloat(HConstants.LOAD_BALANCER_SLOP_KEY, (float) 100.0); // no load balancing - conf.setInt("hbase.regionserver.wal.max.splitters", 3); + conf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 3); conf.setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 10); conf.set("hbase.wal.provider", getWalProvider()); StartMiniClusterOption option = StartMiniClusterOption.builder() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java index e55e375a5e..242455cfe5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRestartCluster.java @@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -47,12 +49,16 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.Threads; import org.junit.After; import org.junit.Assert; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({ MasterTests.class, LargeTests.class }) public class TestRestartCluster { @@ -63,6 +69,9 @@ public class TestRestartCluster { private static final Logger LOG = LoggerFactory.getLogger(TestRestartCluster.class); private HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Parameterized.Parameter + public boolean splitWALCoordinatedByZK; + private static final TableName[] TABLES = { TableName.valueOf("restartTableOne"), TableName.valueOf("restartTableTwo"), @@ -70,6 +79,13 @@ public class TestRestartCluster { }; private static final byte[] FAMILY = Bytes.toBytes("family"); + @Before + public void setup() throws Exception { + LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK); + UTIL.getConfiguration().setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + splitWALCoordinatedByZK); + } + @After public void tearDown() throws Exception { UTIL.shutdownMiniCluster(); } @@ -301,4 +317,9 @@ public class TestRestartCluster { Thread.sleep(100); } } + + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java index 8a6f708f10..0aba487478 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRollingRestart.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.master; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.NavigableSet; import java.util.Set; @@ -28,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; @@ -45,6 +48,8 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +58,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** * Tests the restarting of everything as done during rolling restarts. */ +@RunWith(Parameterized.class) @Category({MasterTests.class, LargeTests.class}) public class TestRollingRestart { @@ -65,6 +71,9 @@ public class TestRollingRestart { @Rule public TestName name = new TestName(); + @Parameterized.Parameter + public boolean splitWALCoordinatedByZK; + @Test public void testBasicRollingRestart() throws Exception { @@ -78,6 +87,8 @@ public class TestRollingRestart { // Start the cluster log("Starting cluster"); Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + splitWALCoordinatedByZK); HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf); StartMiniClusterOption option = StartMiniClusterOption.builder() .numMasters(NUM_MASTERS).numRegionServers(NUM_RS).numDataNodes(NUM_RS).build(); @@ -87,7 +98,8 @@ public class TestRollingRestart { cluster.waitForActiveAndReadyMaster(); // Create a table with regions - final TableName tableName = TableName.valueOf(name.getMethodName()); + final TableName tableName = + TableName.valueOf(name.getMethodName().replaceAll("[\\[|\\]]", "-")); byte [] family = Bytes.toBytes("family"); log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions"); Table ht = TEST_UTIL.createMultiRegionTable(tableName, family, NUM_REGIONS_TO_CREATE); @@ -284,5 +296,9 @@ public class TestRollingRestart { } + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java new file mode 100644 index 0000000000..9e127c0727 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -0,0 +1,383 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; +import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.SPLIT_WAL; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@Category({ MasterTests.class, MediumTests.class }) + +public class TestSplitWALManager { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALManager.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private SplitWALManager splitWALManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + splitWALManager = master.getSplitWALManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALManager")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAcquireAndRelease() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + } + + @Test + public void testAddNewServer() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitWALManager.acquireSplitWALWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(1))); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitWALManager.acquireSplitWALWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); + newServer.waitForServerOnline(); + Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); + } + + @Test + public void testCreateSplitWALProcedures() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); + // Test splitting meta wal + FileStatus[] wals = + TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); + Assert.assertEquals(1, wals.length); + List testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + + // Test splitting wal + wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); + Assert.assertEquals(1, wals.length); + testProcedures = + splitWALManager.createSplitWALProcedures(Lists.newArrayList(wals[0]), metaServer); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + } + + @Test + public void testAcquireAndReleaseSplitWALWorker() throws Exception { + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + // let one procedure finish and release worker + testProcedures.get(0).countDown(); + TEST_UTIL.waitFor(10000, () -> failedProcedure.isWorkerAcquired()); + Assert.assertTrue(testProcedures.get(0).isSuccess()); + } + + @Test + public void testGetWALsToSplit() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + List metaWals = splitWALManager.getWALsToSplit(metaServer, true); + Assert.assertEquals(1, metaWals.size()); + List wals = splitWALManager.getWALsToSplit(metaServer, false); + Assert.assertEquals(1, wals.size()); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + metaWals = splitWALManager.getWALsToSplit(testServer, true); + Assert.assertEquals(0, metaWals.size()); + } + + @Test + public void testSplitLogs() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + List procedures = splitWALManager.splitWALs(testServer, false); + Assert.assertEquals(1, procedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); + Assert.assertEquals(0, splitWALManager.getWALsToSplit(testServer, false).size()); + + procedures = splitWALManager.splitWALs(metaServer, true); + Assert.assertEquals(1, procedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, procedures.get(0)); + Assert.assertEquals(0, splitWALManager.getWALsToSplit(metaServer, true).size()); + Assert.assertEquals(1, splitWALManager.getWALsToSplit(metaServer, false).size()); + } + + @Test + public void testWorkerReloadWhenMasterRestart() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), procedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(10000, () -> testProcedures.get(2).isWorkerAcquired()); + // Kill master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); + // restart master + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + this.master = TEST_UTIL.getHBaseCluster().getMaster(); + + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), failedProcedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(20000, () -> failedProcedure.isTriedToAcquire()); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + for (int i = 0; i < 3; i++) { + testProcedures.get(i).countDown(); + } + failedProcedure.countDown(); + } + + public static final class FakeServerProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private ServerName serverName; + private ServerName worker; + private CountDownLatch barrier = new CountDownLatch(1); + private boolean triedToAcquire = false; + + public FakeServerProcedure() { + } + + public FakeServerProcedure(ServerName serverName) { + this.serverName = serverName; + } + + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return SPLIT_WAL; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + SplitWALManager splitWALManager = env.getMasterServices().getSplitWALManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + triedToAcquire = true; + worker = splitWALManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + barrier.await(); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + public boolean isWorkerAcquired() { + return worker != null; + } + + public boolean isTriedToAcquire() { + return triedToAcquire; + } + + public void countDown() { + this.barrier.countDown(); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws IOException, InterruptedException { + + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath("test").setCrashedServer(ProtobufUtil.toServerName(serverName)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + serverName = ProtobufUtil.toServerName(data.getCrashedServer()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index af2076e2e8..b852e46ff1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -44,9 +46,13 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({MasterTests.class, LargeTests.class}) public class TestServerCrashProcedure { @@ -58,6 +64,9 @@ public class TestServerCrashProcedure { protected HBaseTestingUtility util; + @Parameter + public boolean splitWALCoordinatedByZK; + private ProcedureMetrics serverCrashProcMetrics; private long serverCrashSubmittedCount = 0; private long serverCrashFailedCount = 0; @@ -67,6 +76,8 @@ public class TestServerCrashProcedure { conf.set("hbase.balancer.tablesOnMaster", "none"); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3); + LOG.info("WAL splitting coordinated by zk? {}", splitWALCoordinatedByZK); + conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK); } @Before @@ -173,7 +184,8 @@ public class TestServerCrashProcedure { @Test public void testConcurrentSCPForSameServer() throws Exception { - final TableName tableName = TableName.valueOf("testConcurrentSCPForSameServer"); + final TableName tableName = + TableName.valueOf("testConcurrentSCPForSameServer-" + splitWALCoordinatedByZK); try (Table t = createTable(tableName)) { // Load the table with a bit of data so some logs to split and some edits in each region. this.util.loadTable(t, HBaseTestingUtility.COLUMNS[0]); @@ -222,4 +234,9 @@ public class TestServerCrashProcedure { serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount(); serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount(); } + + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java new file mode 100644 index 0000000000..c27899b704 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; + +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.SplitWALManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSplitWALProcedure { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALProcedure.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private SplitWALManager splitWALManager; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + splitWALManager = master.getSplitWALManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALProcedure")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHandleDeadWorker() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = splitWALManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString(), testServer.getServerName()); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + long pid = ProcedureTestingUtility.submitProcedure(masterPE, splitWALProcedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + TEST_UTIL.getHBaseCluster().killRegionServer(splitWALProcedure.getWorker()); + ProcedureTestingUtility.waitProcedure(masterPE, pid); + Assert.assertTrue(splitWALProcedure.isSuccess()); + } + + @Test + public void testMasterRestart() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = splitWALManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString(), testServer.getServerName()); + long pid = ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), + splitWALProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + // Kill master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); + // restart master + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + this.master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), pid); + Optional> procedure = + master.getProcedures().stream().filter(p -> p.getProcId() == pid).findAny(); + // make sure procedure is successful and wal is deleted + Assert.assertTrue(procedure.isPresent()); + Assert.assertTrue(procedure.get().isSuccess()); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals.get(0).getPath())); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index cbf932c568..14dc619d57 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; import static org.junit.Assert.assertEquals; @@ -449,7 +450,7 @@ public class TestSplitLogWorker { final ServerName RS = ServerName.valueOf("rs,1,1"); final int maxTasks = 3; Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); + testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); RegionServerServices mockedRS = getRegionServer(RS); for (int i = 0; i < maxTasks; i++) { zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), @@ -485,7 +486,7 @@ public class TestSplitLogWorker { final ServerName RS2 = ServerName.valueOf("rs,1,2"); final int maxTasks = 3; Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); + testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); RegionServerServices mockedRS = getRegionServer(RS); // create two RS nodes -- 2.17.1