From adf80e8b31af4f6e101728b408cd572f2ac829ec Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Wed, 19 Dec 2018 18:39:37 +0800 Subject: [PATCH] HBASE-21588 Procedure v2 wal splitting implementation --- .../java/org/apache/hadoop/hbase/HConstants.java | 4 + .../hbase/procedure2/LockedResourceType.java | 2 +- .../src/main/protobuf/MasterProcedure.proto | 24 ++ .../coordination/SplitLogWorkerCoordination.java | 4 +- .../apache/hadoop/hbase/executor/EventType.java | 9 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 5 + .../hadoop/hbase/master/MasterWalManager.java | 70 ++++- .../hadoop/hbase/master/SplitLogManager.java | 91 +++++++ .../master/procedure/MasterProcedureScheduler.java | 38 ++- .../hbase/master/procedure/SchemaLocking.java | 13 + .../master/procedure/ServerCrashProcedure.java | 72 ++++- .../hbase/master/procedure/SplitWALProcedure.java | 175 ++++++++++++ .../procedure/SplitWALProcedureInterface.java | 40 +++ .../hbase/master/procedure/SplitWALQueue.java | 35 +++ .../master/procedure/SplitWALRemoteProcedure.java | 168 ++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 13 +- .../hbase/regionserver/SplitWALCallable.java | 294 +++++++++++++++++++++ .../regionserver/WrongTargetServerException.java | 37 +++ .../hadoop/hbase/master/TestMasterWalManager.java | 269 +++++++++++++++++++ .../master/TestSplitLogManagerForProcedureV2.java | 160 +++++++++++ .../master/procedure/TestServerCrashProcedure.java | 22 +- .../master/procedure/TestSplitWALProcedure.java | 130 +++++++++ 23 files changed, 1661 insertions(+), 17 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedureInterface.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManagerForProcedureV2.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index fdc3d82..4662404 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1311,6 +1311,10 @@ public final class HConstants { public static final String HBASE_CLIENT_FAST_FAIL_INTERCEPTOR_IMPL = "hbase.client.fast.fail.interceptor.impl"; + public static final String HBASE_SPLIT_WAL_COORDINATED_BY_ZK = "hbase.split.wal.zk.coordinated"; + + public static final boolean DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK = true; + /** Config key for if the server should send backpressure and if the client should listen to * that backpressure from the server */ public static final String ENABLE_CLIENT_BACKPRESSURE = "hbase.client.backpressure.enabled"; diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java index 55d195b..176d001 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/LockedResourceType.java @@ -22,5 +22,5 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public enum LockedResourceType { - SERVER, NAMESPACE, TABLE, REGION, PEER, META + SERVER, NAMESPACE, TABLE, REGION, PEER, META, SPLIT_WAL } diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index cc0c6ba..30a153c 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -308,6 +308,8 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_SPLIT_META_WAL_CHECK=12; + SERVER_CRASH_SPLIT_WAL_CHECK=13; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -551,3 +553,25 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +message SplitWALParameter { + required ServerName serverName = 1; + required string walPath = 2; +} + + +message SplitWALData{ + required string walPath = 1; + optional ServerName serverName = 2; +} + +message SplitWALRemoteData{ + required ServerName serverName = 1; + required string walPath = 2; +} + +enum SplitWALState{ + ACQUIRE_SPLIT_WAL_WORKER = 1; + DISPATCH_WAL_TO_WORKER = 2; + RELEASE_SPLIT_WORKER = 3; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java index ab04f60..8354a15 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/SplitLogWorkerCoordination.java @@ -103,12 +103,12 @@ public interface SplitLogWorkerCoordination { /** * set the listener for task changes. Implementation specific */ - void registerListener(); + default void registerListener(){} /** * remove the listener for task changes. Implementation specific */ - void removeListener(); + default void removeListener(){} /* WALSplitterHandler part */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1c..4c35011 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -288,7 +288,14 @@ public enum EventType { * * RS_REPLAY_SYNC_REPLICATION_WAL */ - RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL), + + /** + * RS split wal that under splitting dirs + * + * RS_LOG_SPLITTING + */ + RS_LOG_SPLITTING(86, ExecutorType.RS_WAL_SPLITTING); private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354..ad36746 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_WAL_SPLITTING(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 0bcef59..6d4becc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -17,7 +17,9 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import com.google.protobuf.Descriptors; import com.google.protobuf.Service; @@ -945,6 +947,9 @@ public class HMaster extends HRegionServer implements MasterServices { // loading. this.serverManager = createServerManager(this); this.syncReplicationReplayWALManager = new SyncReplicationReplayWALManager(this); + if (!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.walManager.getSplitLogManager().registerSplitWorkerAssigner(); + } createProcedureExecutor(); @SuppressWarnings("rawtypes") Map, List>> procsByType = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 5ab1c28..88bd6c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -28,6 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +36,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; @@ -44,6 +48,7 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * This class abstracts a bunch of operations the HMaster needs @@ -60,7 +65,8 @@ public class MasterWalManager { } }; - final static PathFilter NON_META_FILTER = new PathFilter() { + @VisibleForTesting + public final static PathFilter NON_META_FILTER = new PathFilter() { @Override public boolean accept(Path p) { return !AbstractFSWALProvider.isMetaFile(p); @@ -396,4 +402,66 @@ public class MasterWalManager { } + public Procedure[] splitLogs(ServerName serverName, boolean splitMeta) + throws IOException { + List walSplittingProcedures = null; + try { + // 1. list all splitting files + List splittingFiles = getWALsToSplit(serverName, splitMeta); + // 2. create corresponding procedures + walSplittingProcedures = splitLogManager.createSplitWALProcedures(splittingFiles); + } catch (IOException e) { + LOG.error("failed to create procedures for splitting logs of {}", serverName, e); + throw e; + } + + return walSplittingProcedures == null ? new Procedure[0] + : walSplittingProcedures.toArray(new Procedure[walSplittingProcedures.size()]); + } + + public List getWALsToSplit(ServerName serverName, boolean splitMeta) + throws IOException { + List logDirs = getLogDirs(Collections.singleton(serverName)); + FileStatus[] fileStatuses = + SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); + LOG.info("size of WALs of {} is {}", serverName, fileStatuses.length); + return Lists.newArrayList(fileStatuses); + } + + private Path getWALSplitDir(ServerName serverName) { + Path logDir = + new Path(this.rootDir, AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + return logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + } + + public void deleteSplitWal(String wal) { + try { + fs.delete(new Path(wal), false); + } catch (IOException e) { + LOG.error("remove wal {} failed, ignore...", wal); + } + } + + public void deleteWALDir(ServerName serverName) { + Path splitDir = getWALSplitDir(serverName); + try { + fs.delete(splitDir, false); + } catch (IOException e) { + LOG.warn("remove wal Dir {} failed, ignore...", splitDir); + } + + } + + public ServerName acquireSplitWALWorker(Procedure procedure) throws ProcedureSuspendedException { + return splitLogManager.acquireSplitWorker(procedure); + } + + public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + splitLogManager.releaseSplitWorker(worker, scheduler); + } + + public boolean isSplitWALFinished(String wal) throws IOException { + Path walPath = new Path(rootDir, wal); + return !fs.exists(walPath); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 4d977d3..49ff5ba 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hbase.master; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.CHECK; import static org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective.FORCE; import static org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus.DELETED; @@ -31,6 +33,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -49,8 +52,13 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; @@ -110,6 +118,7 @@ public class SplitLogManager { private volatile Set deadWorkers = null; private final Object deadWorkersLock = new Object(); + private SplitWorkerAssigner splitWorkerAssigner; /** * Its OK to construct this object even when region-servers are not online. It does lookup the * orphan tasks in coordination engine but it doesn't block waiting for them to be done. @@ -140,6 +149,11 @@ public class SplitLogManager { new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), master); choreService.scheduleChore(timeoutMonitor); + if(!conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.splitWorkerAssigner = new SplitWorkerAssigner(server, conf.getInt( + "hbase.regionserver.wal.max.splitters", 3)); + } } private SplitLogManagerCoordination getSplitLogManagerCoordination() { @@ -183,6 +197,83 @@ public class SplitLogManager { return fileStatus.toArray(a); } + public List createSplitWALProcedures(List splittingWALs) { + List logSplittingProcedures = new ArrayList<>(); + for (FileStatus splittingFile : splittingWALs) { + SplitWALProcedure procedure = + new SplitWALProcedure(splittingFile.getPath().toString()); + logSplittingProcedures.add(procedure); + } + return logSplittingProcedures; + } + + public ServerName acquireSplitWorker(Procedure procedure) throws ProcedureSuspendedException { + Optional worker = splitWorkerAssigner.acquire(); + if (worker.isPresent()) { + return worker.get(); + } + splitWorkerAssigner.suspend(procedure); + throw new ProcedureSuspendedException(); + } + + public void releaseSplitWorker(ServerName worker, MasterProcedureScheduler scheduler) { + splitWorkerAssigner.release(worker); + splitWorkerAssigner.wake(scheduler); + } + + + public void registerSplitWorkerAssigner(){ + this.splitWorkerAssigner.registerListener(); + } + + public static final class SplitWorkerAssigner implements ServerListener { + private int maxSplitTasks; + private final ProcedureEvent event; + private Map currentWorkers = new ConcurrentHashMap<>(); + private MasterServices master; + + public SplitWorkerAssigner(MasterServices master, int maxSplitTasks) { + this.maxSplitTasks = maxSplitTasks; + this.master = master; + this.event = new ProcedureEvent<>("split-wal-worker-assigning"); + } + + public void registerListener(){ + master.getServerManager().registerListener(this); + } + + public Optional acquire() { + Optional worker = master.getServerManager().getOnlineServers().keySet().stream().filter( + serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0) + .findAny(); + if (worker.isPresent()) { + currentWorkers.compute(worker.get(), (serverName, + availableWorker) -> availableWorker == null ? maxSplitTasks - 1 : availableWorker - 1); + } + return worker; + } + + public void release(ServerName serverName) { + currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); + } + + public void suspend(Procedure proc) { + event.suspend(); + event.suspendIfNotReady(proc); + } + + public void wake(MasterProcedureScheduler scheduler) { + if (!event.isReady()) { + event.wake(scheduler); + } + } + + @Override + public void serverAdded(ServerName serverName) { + this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } + } + /** * @param logDir one region sever wal dir path in .logs * @throws IOException if there was an error while splitting any log file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 4bf16ec..6b3ef2b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -96,6 +96,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti @InterfaceAudience.Private public class MasterProcedureScheduler extends AbstractProcedureScheduler { private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class); + private static final String SPLIT_WAL_QUEUE = "SPLIT_WAL_QUEUE"; private static final AvlKeyComparator SERVER_QUEUE_KEY_COMPARATOR = (n, k) -> n.compareKey((ServerName) k); @@ -105,16 +106,20 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { (n, k) -> n.compareKey((String) k); private final static AvlKeyComparator META_QUEUE_KEY_COMPARATOR = (n, k) -> n.compareKey((TableName) k); + private final static AvlKeyComparator SPLIT_WAL_QUEUE_KEY_COMPARATOR = + (n, k) -> n.compareKey((String) k); private final FairQueue serverRunQueue = new FairQueue<>(); private final FairQueue tableRunQueue = new FairQueue<>(); private final FairQueue peerRunQueue = new FairQueue<>(); private final FairQueue metaRunQueue = new FairQueue<>(); + private final FairQueue splitWalRunQueue = new FairQueue<>(); private final ServerQueue[] serverBuckets = new ServerQueue[128]; private TableQueue tableMap = null; private PeerQueue peerMap = null; private MetaQueue metaMap = null; + private SplitWALQueue splitWalMap = null; private final SchemaLocking locking; @@ -138,6 +143,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront); } else if (isPeerProcedure(proc)) { doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront); + } else if (isSplitWalProcedure(proc)){ + doAdd(splitWalRunQueue, getSplitWALQueue(), proc, addFront); } else { // TODO: at the moment we only have Table and Server procedures // if you are implementing a non-table/non-server procedure, you have two options: create @@ -173,8 +180,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override protected boolean queueHasRunnables() { - return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() || - serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables(); + return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() + || serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables() + || splitWalRunQueue.hasRunnables(); } @Override @@ -188,6 +196,9 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { pollResult = doPoll(serverRunQueue); } if (pollResult == null) { + pollResult = doPoll(splitWalRunQueue); + } + if (pollResult == null) { pollResult = doPoll(peerRunQueue); } if (pollResult == null) { @@ -278,6 +289,10 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR); peerMap = null; + // Remove SplitWal + clear(splitWalMap, splitWalRunQueue, SPLIT_WAL_QUEUE_KEY_COMPARATOR); + splitWalMap = null; + assert size() == 0 : "expected queue size to be 0, got " + size(); } @@ -310,6 +325,7 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { count += queueSize(tableMap); count += queueSize(peerMap); count += queueSize(metaMap); + count += queueSize(splitWalMap); return count; } @@ -496,6 +512,24 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } // ============================================================================ + // Split Wal Queue Lookup Helpers + // ============================================================================ + private SplitWALQueue getSplitWALQueue(){ + SplitWALQueue node = AvlTree.get(splitWalMap, SPLIT_WAL_QUEUE, SPLIT_WAL_QUEUE_KEY_COMPARATOR); + if (node != null) { + return node; + } + node = new SplitWALQueue(SPLIT_WAL_QUEUE, locking.getSplitWalLock()); + splitWalMap = AvlTree.insert(splitWalMap, node); + return node; + } + + private boolean isSplitWalProcedure(Procedure proc) { + return proc instanceof SplitWALProcedureInterface; + } + + + // ============================================================================ // Meta Queue Lookup Helpers // ============================================================================ private MetaQueue getMetaQueue() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java index 70e7c59..b5d6ee1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SchemaLocking.java @@ -46,6 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; @InterfaceAudience.Private class SchemaLocking { + private static final String SPLIT_WAL_LOCK = "SPLIT_WAL_LOCK"; private final Function> procedureRetriever; private final Map serverLocks = new HashMap<>(); private final Map namespaceLocks = new HashMap<>(); @@ -53,11 +54,13 @@ class SchemaLocking { // Single map for all regions irrespective of tables. Key is encoded region name. private final Map regionLocks = new HashMap<>(); private final Map peerLocks = new HashMap<>(); + private final LockAndQueue splitWalLock; private final LockAndQueue metaLock; public SchemaLocking(Function> procedureRetriever) { this.procedureRetriever = procedureRetriever; this.metaLock = new LockAndQueue(procedureRetriever); + this.splitWalLock = new LockAndQueue(procedureRetriever); } private LockAndQueue getLock(Map map, T key) { @@ -114,6 +117,10 @@ class SchemaLocking { return peerLocks.remove(peerId); } + LockAndQueue getSplitWalLock() { + return splitWalLock; + } + private LockedResource createLockedResource(LockedResourceType resourceType, String resourceName, LockAndQueue queue) { LockType lockType; @@ -164,6 +171,8 @@ class SchemaLocking { addToLockedResources(lockedResources, peerLocks, Function.identity(), LockedResourceType.PEER); addToLockedResources(lockedResources, ImmutableMap.of(TableName.META_TABLE_NAME, metaLock), tn -> tn.getNameAsString(), LockedResourceType.META); + addToLockedResources(lockedResources, ImmutableMap.of(SPLIT_WAL_LOCK, splitWalLock), + Function.identity(), LockedResourceType.SPLIT_WAL); return lockedResources; } @@ -189,8 +198,12 @@ class SchemaLocking { case PEER: queue = peerLocks.get(resourceName); break; + case SPLIT_WAL: + queue = splitWalLock; + break; case META: queue = metaLock; + break; default: queue = null; break; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 05bcd28..6cff5d2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -17,10 +17,14 @@ */ package org.apache.hadoop.hbase.master.procedure; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -31,6 +35,7 @@ import org.apache.hadoop.hbase.master.MasterWalManager; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -107,6 +112,7 @@ public class ServerCrashProcedure protected Flow executeFromState(MasterProcedureEnv env, ServerCrashState state) throws ProcedureSuspendedException, ProcedureYieldException { final MasterServices services = env.getMasterServices(); + final AssignmentManager am = env.getAssignmentManager(); // HBASE-14802 // If we have not yet notified that we are processing a dead server, we should do now. if (!notifiedDeadServer) { @@ -117,6 +123,7 @@ public class ServerCrashProcedure switch (state) { case SERVER_CRASH_START: case SERVER_CRASH_SPLIT_META_LOGS: + case SERVER_CRASH_SPLIT_META_WAL_CHECK: case SERVER_CRASH_ASSIGN_META: break; default: @@ -137,8 +144,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_META_LOGS: - splitMetaLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + am.getRegionStates().metaLogSplitting(serverName); + addChildProcedure(createSplittingWalProcedures(env, true)); + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_WAL_CHECK); + } + break; + case SERVER_CRASH_SPLIT_META_WAL_CHECK: + if(isSplittingDone(env, true)){ + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + am.getRegionStates().metaLogSplit(serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); + } break; case SERVER_CRASH_ASSIGN_META: assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); @@ -156,8 +179,24 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_LOGS: - splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + if (env.getMasterConfiguration().getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + splitLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + am.getRegionStates().logSplitting(this.serverName); + addChildProcedure(createSplittingWalProcedures(env, false)); + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_WAL_CHECK); + } + break; + case SERVER_CRASH_SPLIT_WAL_CHECK: + if (isSplittingDone(env, false)) { + cleanupSplitDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + am.getRegionStates().logSplit(this.serverName); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + } break; case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. @@ -179,6 +218,7 @@ public class ServerCrashProcedure setNextState(ServerCrashState.SERVER_CRASH_FINISH); break; case SERVER_CRASH_FINISH: + LOG.info("removed crashed server {} after splitting done", serverName); services.getAssignmentManager().getRegionStates().removeServer(serverName); services.getServerManager().getDeadServers().finish(serverName); return Flow.NO_MORE_STATE; @@ -191,6 +231,30 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } + private void cleanupSplitDir(MasterProcedureEnv env) { + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + mwm.deleteWALDir(serverName); + } + + private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { + LOG.info("start to check if splitting finished"); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + try { + return mwm.getWALsToSplit(serverName, splitMeta).size() == 0; + } catch (IOException e) { + LOG.error("get filelist of serverName {} failed", serverName); + return false; + } + } + + private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta) + throws IOException { + LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + Procedure[] procedures = mwm.splitLogs(serverName, splitMeta); + return procedures; + } + private boolean filterDefaultMetaRegions() { if (regionsOnCrashedServer == null) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java new file mode 100644 index 0000000..12f0548 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -0,0 +1,175 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureUtil; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; + +/** + * The procedure is to split a WAL. It will get an available region server and + * schedule a {@link SplitWALRemoteProcedure} to actually send the request to region + * server to split this WAL. + * It also check if the split wal task really succeed. If the WAL still exists, it will + * schedule another region server to split this WAL. + */ +@InterfaceAudience.Private +public class SplitWALProcedure + extends StateMachineProcedure + implements SplitWALProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALProcedure.class); + private String walPath; + private ServerName worker; + private int attempts = 0; + + public SplitWALProcedure() { + } + + public SplitWALProcedure(String walPath) { + this.walPath = walPath; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + MasterWalManager walManager = env.getMasterServices().getMasterWalManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + worker = walManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + assert worker != null; + addChildProcedure(new SplitWALRemoteProcedure(worker, walPath)); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + boolean finished; + try { + finished = walManager.isSplitWALFinished(walPath); + } catch (IOException ioe) { + long backoff = ProcedureUtil.getBackoffTimeMs(attempts++); + LOG.warn("Failed to check whether splitting wal {} success", walPath, backoff / 1000, ioe); + throw suspend(backoff); + } + walManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + if (!finished) { + LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); + setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); + return Flow.HAS_MORE_STATE; + } + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + @Override + protected void rollbackState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState splitOneWalState) + throws IOException, InterruptedException { + if (splitOneWalState == getInitialState()) { + return; + } + throw new UnsupportedOperationException(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.serializeStateData(serializer); + MasterProcedureProtos.SplitWALData.Builder builder = + MasterProcedureProtos.SplitWALData.newBuilder(); + builder.setWalPath(walPath); + if (worker != null) { + builder.setServerName(ProtobufUtil.toServerName(worker)); + } + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + super.deserializeStateData(serializer); + MasterProcedureProtos.SplitWALData data = + serializer.deserialize(MasterProcedureProtos.SplitWALData.class); + walPath = data.getWalPath(); + if (data.hasServerName()) { + worker = ProtobufUtil.toServerName(data.getServerName()); + } + } + + @Override + protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + env.getProcedureScheduler().addFront(this); + return false; + } + + protected final ProcedureSuspendedException suspend(long backoff) + throws ProcedureSuspendedException { + attempts++; + setTimeout(Math.toIntExact(backoff)); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + @Override + public String getWAL() { + return walPath; + } + + @Override + public SplitWALOperationType getSplitWALOperationType() { + return SplitWALOperationType.SPLIT_WAL; + } + + @VisibleForTesting + public ServerName getWorker(){ + return worker; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedureInterface.java new file mode 100644 index 0000000..7fb7564 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedureInterface.java @@ -0,0 +1,40 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface SplitWALProcedureInterface { + enum SplitWALOperationType { + /** + * help find a available region server as worker and manage the split wal task flow. + */ + SPLIT_WAL, + + /** + * send the split WAL request to region server and handle the response + */ + SPLIT_WAL_REMOTE + } + + String getWAL(); + + SplitWALOperationType getSplitWALOperationType(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java new file mode 100644 index 0000000..a475077 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALQueue.java @@ -0,0 +1,35 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import org.apache.hadoop.hbase.procedure2.LockStatus; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class SplitWALQueue extends Queue { + public SplitWALQueue(String name, LockStatus lockStatus) { + super(name, lockStatus); + } + + @Override + boolean requireExclusiveLock(Procedure proc) { + return false; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java new file mode 100644 index 0000000..86dcfff --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALRemoteProcedure.java @@ -0,0 +1,168 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; +import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; +import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SplitWALCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; +/** + * A remote procedure which is used to send split WAL request to region server. + */ +@InterfaceAudience.Private +public class SplitWALRemoteProcedure extends Procedure + implements RemoteProcedureDispatcher.RemoteProcedure, + SplitWALProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALRemoteProcedure.class); + private String walPath; + private ServerName serverName; + private boolean dispatched; + private ProcedureEvent event; + private boolean success = false; + + public SplitWALRemoteProcedure() { + } + + public SplitWALRemoteProcedure(ServerName serverName, String wal) { + this.serverName = serverName; + this.walPath = wal; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if (success) { + return null; + } + dispatched = false; + } + try { + env.getRemoteDispatcher().addOperationToNode(serverName, this); + } catch (NoNodeDispatchException | NullTargetServerDispatchException + | NoServerDispatchException e) { + LOG.error("dispatch log {} to {} failed ", walPath, serverName, e); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData.Builder builder = + MasterProcedureProtos.SplitWALRemoteData.newBuilder(); + builder.setWalPath(walPath) + .setServerName(ProtobufUtil.toServerName(serverName)); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplitWALRemoteData data = + serializer.deserialize(MasterProcedureProtos.SplitWALRemoteData.class); + walPath = data.getWalPath(); + serverName = ProtobufUtil.toServerName(data.getServerName()); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplitWALCallable.class, + MasterProcedureProtos.SplitWALParameter.newBuilder() + .setServerName(ProtobufUtil.toServerName(serverName)).setWalPath(walPath).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + private void complete(MasterProcedureEnv env, Throwable error) { + if (error == null) { + LOG.info("Split of {} on {} succeeded", walPath, serverName); + env.getMasterServices().getMasterWalManager().deleteSplitWal(walPath); + success = true; + } else { + if (error instanceof DoNotRetryIOException) { + LOG.error("WAL split task send to a wrong server {}, will retry on another server", + serverName, error); + success = true; + } else { + LOG.error("split WAL failed, retry..."); + success = false; + } + + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + LOG.error("Remote call failed for {} on {}", walPath, serverName, error); + complete(env, error); + } + + @Override + public String getWAL() { + return this.walPath; + } + + @Override + public SplitWALOperationType getSplitWALOperationType() { + return SplitWALOperationType.SPLIT_WAL_REMOTE; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 13f277b..30d041c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.MemoryType; @@ -619,7 +622,10 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { - this.csm = new ZkCoordinatedStateManager(this); + if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + this.csm = new ZkCoordinatedStateManager(this); + } masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); masterAddressTracker.start(); @@ -1949,6 +1955,8 @@ public class HRegionServer extends HasThread implements this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } + this.executorService.startExecutorService(ExecutorType.RS_WAL_SPLITTING, conf.getInt( + "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); // Start the threads for compacted files discharger @@ -1993,7 +2001,8 @@ public class HRegionServer extends HasThread implements sinkConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, conf.getInt("hbase.log.replay.rpc.timeout", 30000)); // default 30 seconds sinkConf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1); - if (this.csm != null) { + if (this.csm != null + && conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { // SplitLogWorker needs csm. If none, don't start this. this.splitLogWorker = new SplitLogWorker(sinkConf, this, this, walFactory); splitLogWorker.start(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java new file mode 100644 index 0000000..f9895a8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitWALCallable.java @@ -0,0 +1,294 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; +import java.util.Optional; +import java.util.concurrent.locks.Lock; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.executor.EventHandler; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.hbase.replication.ReplicationPeerImpl; +import org.apache.hadoop.hbase.replication.ReplicationUtils; +import org.apache.hadoop.hbase.replication.SyncReplicationState; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.KeyLocker; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@InterfaceAudience.Private +public class SplitWALCallable implements RSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SplitWALCallable.class); + + private String walPath; + private Exception initError; + private Exception processError; + private HRegionServer rs; + private boolean finished = false; + private final KeyLocker splitWALLocks = new KeyLocker<>(); + private volatile Lock splitWALLock = null; + + + @Override + public void init(byte[] parameter, HRegionServer rs) { + try { + this.rs = rs; + MasterProcedureProtos.SplitWALParameter param = + MasterProcedureProtos.SplitWALParameter.parseFrom(parameter); + if (!rs.getServerName().equals(ProtobufUtil.toServerName(param.getServerName()))) { + initError = new WrongTargetServerException( + "this request sent to a wrong target server " + rs.getServerName()); + } + this.walPath = param.getWalPath(); + } catch (InvalidProtocolBufferException e) { + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_SPLITTING; + } + + @Override + public Void call() throws Exception { + if (initError != null) { + throw initError; + } + //grab a lock + splitWALLock = splitWALLocks.acquireLock(walPath); + try{ + //submit split wal task to executor + splitWal(); + } finally { + splitWALLock.unlock(); + } + waitForComplete(); + + if (processError != null) { + throw processError; + } + return null; + } + + public void finish(boolean success) { + if (!success) { + processError = new IOException("Split wal " + walPath + " failed at server "); + } + synchronized (this) { + finished = true; + notifyAll(); + } + } + + @VisibleForTesting + public boolean isFinished(){ + return finished; + } + + @VisibleForTesting + public boolean isLocked() { + return splitWALLock != null; + } + + public String getWalPath() { + return this.walPath; + } + + private void waitForComplete() throws InterruptedException { + while (!finished) { + LOG.debug("splitting is still ongoing for WAL {}", this.walPath); + synchronized (this) { + this.wait(2000); + } + } + LOG.info("split WAL succeed {}", this.walPath); + } + + private void splitWal() { + WALSplittingHandler walSplittingHandler = new WALSplittingHandler(rs, EventType.RS_LOG_REPLAY, + walPath -> splitWAL(walPath, rs.getConfiguration(), rs, rs, rs.walFactory), walPath, this); + rs.getExecutorService().submit(walSplittingHandler); + } + + + @FunctionalInterface + public interface SplitWALTaskExecutor { + boolean exec(String name); + } + + private static boolean splitWAL(String name, Configuration conf, RegionServerServices server, + LastSequenceId sequenceIdChecker, WALFactory factory) { + Path walDir; + FileSystem fs; + try { + walDir = FSUtils.getWALRootDir(conf); + fs = walDir.getFileSystem(conf); + } catch (IOException e) { + LOG.warn("could not find root dir or fs", e); + return false; + } + try { + if (!processSyncReplicationWAL(name, conf, server, fs, walDir)) { + return true; + } + } catch (IOException e) { + LOG.warn("failed to process sync replication wal {}", name, e); + return false; + } + try { + if (!WALSplitter.splitLogFile(walDir, fs.getFileStatus(new Path(walDir, name)), fs, conf, + null, sequenceIdChecker, null, factory)) { + return false; + } + } catch (InterruptedIOException iioe) { + LOG.warn("log splitting of " + name + " interrupted, resigning", iioe); + return false; + } catch (IOException e) { + if (e instanceof FileNotFoundException) { + // A wal file may not exist anymore. Nothing can be recovered so move on + LOG.warn("WAL {} does not exist anymore", name, e); + return true; + } + Throwable cause = e.getCause(); + if (e instanceof RetriesExhaustedException && (cause instanceof NotServingRegionException + || cause instanceof ConnectException || cause instanceof SocketTimeoutException)) { + LOG.warn( + "WAL splitting of " + name + " can't connect to the target regionserver, resigning", + e); + return false; + } else if (cause instanceof InterruptedException) { + LOG.warn("WAL splitting of " + name + " interrupted, resigning", e); + return false; + } + LOG.warn("WAL splitting of " + name + " failed, returning error", e); + return false; + } + return true; + } + + // returns whether we need to continue the split work + private static boolean processSyncReplicationWAL(String name, Configuration conf, + RegionServerServices server, FileSystem fs, Path walDir) throws IOException { + Path walFile = new Path(walDir, name); + String filename = walFile.getName(); + Optional optSyncPeerId = + SyncReplicationWALProvider.getSyncReplicationPeerIdFromWALName(filename); + if (!optSyncPeerId.isPresent()) { + return true; + } + String peerId = optSyncPeerId.get(); + ReplicationPeerImpl peer = + server.getReplicationSourceService().getReplicationPeers().getPeer(peerId); + if (peer == null || !peer.getPeerConfig().isSyncReplication()) { + return true; + } + Pair stateAndNewState = + peer.getSyncReplicationStateAndNewState(); + if (stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) + && stateAndNewState.getSecond().equals(SyncReplicationState.NONE)) { + // copy the file to remote and overwrite the previous one + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteWALDirForPeer = ReplicationUtils.getPeerRemoteWALDir(remoteWALDir, peerId); + Path tmpRemoteWAL = new Path(remoteWALDirForPeer, filename + ".tmp"); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + try (FSDataInputStream in = fs.open(walFile); @SuppressWarnings("deprecation") + FSDataOutputStream out = remoteFs.createNonRecursive(tmpRemoteWAL, true, + FSUtils.getDefaultBufferSize(remoteFs), remoteFs.getDefaultReplication(tmpRemoteWAL), + remoteFs.getDefaultBlockSize(tmpRemoteWAL), null)) { + IOUtils.copy(in, out); + } + Path toCommitRemoteWAL = + new Path(remoteWALDirForPeer, filename + ReplicationUtils.RENAME_WAL_SUFFIX); + // Some FileSystem implementations may not support atomic rename so we need to do it in two + // phases + FSUtils.renameFile(remoteFs, tmpRemoteWAL, toCommitRemoteWAL); + FSUtils.renameFile(remoteFs, toCommitRemoteWAL, new Path(remoteWALDirForPeer, filename)); + } else if ((stateAndNewState.getFirst().equals(SyncReplicationState.ACTIVE) + && stateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) + || stateAndNewState.getFirst().equals(SyncReplicationState.STANDBY)) { + // check whether we still need to process this file + // actually we only write wal file which name is ended with .syncrep in A state, and after + // transiting to a state other than A, we will reopen all the regions so the data in the wal + // will be flushed so the wal file will be archived soon. But it is still possible that there + // is a server crash when we are transiting from A to S, to simplify the logic of the transit + // procedure, here we will also check the remote snapshot directory in state S, so that we do + // not need wait until all the wal files with .syncrep suffix to be archived before finishing + // the procedure. + String remoteWALDir = peer.getPeerConfig().getRemoteWALDir(); + Path remoteSnapshotDirForPeer = ReplicationUtils.getPeerSnapshotWALDir(remoteWALDir, peerId); + FileSystem remoteFs = ReplicationUtils.getRemoteWALFileSystem(conf, remoteWALDir); + if (remoteFs.exists(new Path(remoteSnapshotDirForPeer, filename))) { + // the file has been replayed when the remote cluster was transited from S to DA, the + // content will be replicated back to us so give up split it. + LOG.warn("Giveup splitting {} since it has been replayed in the remote cluster and " + + "the content will be replicated back", + filename); + return false; + } + } + return true; + } + + private static class WALSplittingHandler extends EventHandler { + private SplitWALTaskExecutor executor; + private String wal; + private SplitWALCallable callable; + + public WALSplittingHandler(Server server, EventType eventType, SplitWALTaskExecutor executor, + String wal, SplitWALCallable callable) { + super(server, eventType); + this.executor = executor; + this.wal = wal; + this.callable = callable; + } + + @Override + public void process() throws IOException { + LOG.info("start to process wal splitting of {}", wal); + boolean success = executor.exec(wal); + callable.finish(success); + } + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java new file mode 100644 index 0000000..d230e36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/WrongTargetServerException.java @@ -0,0 +1,37 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * send a request to a wrong server + */ +@InterfaceAudience.Private +public class WrongTargetServerException extends DoNotRetryIOException { + public WrongTargetServerException() { + super(); + } + + public WrongTargetServerException(String message) { + super(message); + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java new file mode 100644 index 0000000..9c6da17 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestMasterWalManager.java @@ -0,0 +1,269 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface.ServerOperationType.CRASH_HANDLER; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.ServerProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.StateMachineProcedure; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestMasterWalManager { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestMasterWalManager.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private MasterWalManager masterWalManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + masterWalManager = master.getMasterWalManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestMasterWalManager")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testGetAllWalsToSplit() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + List metaWals = masterWalManager.getWALsToSplit(metaServer, true); + Assert.assertEquals(1, metaWals.size()); + List wals = masterWalManager.getWALsToSplit(metaServer, false); + Assert.assertEquals(1, wals.size()); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + HRegionServer rs = TEST_UTIL.getHBaseCluster().getRegionServer(testServer); + metaWals = masterWalManager.getWALsToSplit(testServer, true); + Assert.assertEquals(0, metaWals.size()); + } + + @Test + public void testSplitLogsByProcedure() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + ServerName testServer = TEST_UTIL.getHBaseCluster().getRegionServerThreads().stream() + .map(rs -> rs.getRegionServer().getServerName()).filter(rs -> rs != metaServer).findAny() + .get(); + Procedure[] procedures = masterWalManager.splitLogs(testServer, false); + Assert.assertEquals(1, procedures.length); + ProcedureTestingUtility.submitAndWait(masterPE, procedures[0]); + Assert.assertEquals(0, masterWalManager.getWALsToSplit(testServer, false).size()); + + procedures = masterWalManager.splitLogs(metaServer, true); + Assert.assertEquals(1, procedures.length); + ProcedureTestingUtility.submitAndWait(masterPE, procedures[0]); + Assert.assertEquals(0, masterWalManager.getWALsToSplit(metaServer, true).size()); + Assert.assertEquals(1, masterWalManager.getWALsToSplit(metaServer, false).size()); + } + + @Test + public void testAcquireAndReleaseSplitWorker() throws Exception { + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + FakeServerProcedure procedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getRegionServer(i).getServerName()); + testProcedures.add(procedure); + ProcedureTestingUtility.submitProcedure(masterPE, procedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + } + TEST_UTIL.waitFor(5000, () -> testProcedures.get(2).isWorkerAcquired()); + FakeServerProcedure failedProcedure = + new FakeServerProcedure(TEST_UTIL.getHBaseCluster().getServerHoldingMeta()); + ProcedureTestingUtility.submitProcedure(masterPE, failedProcedure, HConstants.NO_NONCE, + HConstants.NO_NONCE); + Thread.sleep(1000); + Assert.assertFalse(failedProcedure.isWorkerAcquired()); + // let one procedure finish and release worker + testProcedures.get(0).countDown(); + Thread.sleep(2000); + TEST_UTIL.waitFor(5000, () -> failedProcedure.isWorkerAcquired()); + Assert.assertTrue(testProcedures.get(0).isSuccess()); + } + + public static final class FakeServerProcedure + extends StateMachineProcedure + implements ServerProcedureInterface { + + private ServerName serverName; + private ServerName worker; + private CountDownLatch barrier = new CountDownLatch(1); + + public FakeServerProcedure() { + } + + public FakeServerProcedure(ServerName serverName) { + this.serverName = serverName; + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return false; + } + + @Override + public ServerOperationType getServerOperationType() { + return CRASH_HANDLER; + } + + @Override + protected Flow executeFromState(MasterProcedureEnv env, + MasterProcedureProtos.SplitWALState state) + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + MasterWalManager walManager = env.getMasterServices().getMasterWalManager(); + switch (state) { + case ACQUIRE_SPLIT_WAL_WORKER: + worker = walManager.acquireSplitWALWorker(this); + setNextState(MasterProcedureProtos.SplitWALState.DISPATCH_WAL_TO_WORKER); + return Flow.HAS_MORE_STATE; + case DISPATCH_WAL_TO_WORKER: + barrier.await(); + setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); + return Flow.HAS_MORE_STATE; + case RELEASE_SPLIT_WORKER: + walManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + return Flow.NO_MORE_STATE; + default: + throw new UnsupportedOperationException("unhandled state=" + state); + } + } + + public boolean isWorkerAcquired() { + return worker != null; + } + + public void countDown() { + this.barrier.countDown(); + } + + @Override + protected void rollbackState(MasterProcedureEnv env, MasterProcedureProtos.SplitWALState state) + throws IOException, InterruptedException { + + } + + @Override + protected MasterProcedureProtos.SplitWALState getState(int stateId) { + return MasterProcedureProtos.SplitWALState.forNumber(stateId); + } + + @Override + protected int getStateId(MasterProcedureProtos.SplitWALState state) { + return state.getNumber(); + } + + @Override + protected MasterProcedureProtos.SplitWALState getInitialState() { + return MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER; + } + + @Override + protected LockState acquireLock(final MasterProcedureEnv env) { + if (env.getProcedureScheduler().waitServerExclusiveLock(this, getServerName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(final MasterProcedureEnv env) { + env.getProcedureScheduler().wakeServerExclusiveLock(this, getServerName()); + } + + @Override + protected boolean holdLock(MasterProcedureEnv env) { + return true; + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManagerForProcedureV2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManagerForProcedureV2.java new file mode 100644 index 0000000..933d6dc --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitLogManagerForProcedureV2.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil; +import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +@Category({ MasterTests.class, MediumTests.class }) + +public class TestSplitLogManagerForProcedureV2 { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitLogManagerForProcedureV2.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private SplitLogManager splitLogManager; + private TableName TABLE_NAME; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + splitLogManager = master.getMasterWalManager().getSplitLogManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitLogManagerForProcedureV2")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testAcquireAndReleaseSplitWorker() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new TestMasterWalManager.FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitLogManager.acquireSplitWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(1))); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitLogManager.acquireSplitWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + splitLogManager.releaseSplitWorker(server, TEST_UTIL.getHBaseCluster().getMaster() + .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(3))); + } + + @Test + public void testAddNewServer() throws Exception { + List testProcedures = new ArrayList<>(); + for (int i = 0; i < 4; i++) { + testProcedures.add(new TestMasterWalManager.FakeServerProcedure( + TEST_UTIL.getHBaseCluster().getServerHoldingMeta())); + } + ServerName server = splitLogManager.acquireSplitWorker(testProcedures.get(0)); + Assert.assertNotNull(server); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(1))); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(2))); + + Exception e = null; + try { + splitLogManager.acquireSplitWorker(testProcedures.get(3)); + } catch (ProcedureSuspendedException suspendException) { + e = suspendException; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof ProcedureSuspendedException); + + JVMClusterUtil.RegionServerThread newServer = TEST_UTIL.getHBaseCluster().startRegionServer(); + newServer.waitForServerOnline(); + Assert.assertNotNull(splitLogManager.acquireSplitWorker(testProcedures.get(3))); + } + + @Test + public void testCreateSplitWALProcedures() throws Exception { + TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + // load table + TEST_UTIL.loadTable(TEST_UTIL.getConnection().getTable(TABLE_NAME), FAMILY); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + ServerName metaServer = TEST_UTIL.getHBaseCluster().getServerHoldingMeta(); + Path metaWALDir = new Path(TEST_UTIL.getDefaultRootDirPath(), + AbstractFSWALProvider.getWALDirectoryName(metaServer.toString())); + // Test splitting meta wal + FileStatus[] wals = + TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.META_FILTER); + Assert.assertEquals(1, wals.length); + List testProcedures = + splitLogManager.createSplitWALProcedures(Lists.newArrayList(wals[0])); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + + // Test splitting wal + wals = TEST_UTIL.getTestFileSystem().listStatus(metaWALDir, MasterWalManager.NON_META_FILTER); + Assert.assertEquals(1, wals.length); + testProcedures = splitLogManager.createSplitWALProcedures(Lists.newArrayList(wals[0])); + Assert.assertEquals(1, testProcedures.size()); + ProcedureTestingUtility.submitAndWait(masterPE, testProcedures.get(0)); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals[0].getPath())); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index af2076e..9e34887 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -21,6 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; @@ -44,9 +46,13 @@ import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +@RunWith(Parameterized.class) @Category({MasterTests.class, LargeTests.class}) public class TestServerCrashProcedure { @@ -58,6 +64,9 @@ public class TestServerCrashProcedure { protected HBaseTestingUtility util; + @Parameter + public boolean splitWALCoordinatedByZK; + private ProcedureMetrics serverCrashProcMetrics; private long serverCrashSubmittedCount = 0; private long serverCrashFailedCount = 0; @@ -67,6 +76,8 @@ public class TestServerCrashProcedure { conf.set("hbase.balancer.tablesOnMaster", "none"); conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 3); + LOG.info("coordinated by zk? {}", splitWALCoordinatedByZK); + conf.setBoolean(HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK, splitWALCoordinatedByZK); } @Before @@ -75,7 +86,7 @@ public class TestServerCrashProcedure { setupConf(this.util.getConfiguration()); startMiniCluster(); ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( - this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); + this.util.getHBaseCluster().getMaster().getMasterProcedureExecutor(), false); serverCrashProcMetrics = this.util.getHBaseCluster().getMaster().getMasterMetrics() .getServerCrashProcMetrics(); } @@ -87,10 +98,10 @@ public class TestServerCrashProcedure { @After public void tearDown() throws Exception { MiniHBaseCluster cluster = this.util.getHBaseCluster(); - HMaster master = cluster == null? null: cluster.getMaster(); + HMaster master = cluster == null ? null : cluster.getMaster(); if (master != null && master.getMasterProcedureExecutor() != null) { ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate( - master.getMasterProcedureExecutor(), false); + master.getMasterProcedureExecutor(), false); } this.util.shutdownMiniCluster(); } @@ -222,4 +233,9 @@ public class TestServerCrashProcedure { serverCrashSubmittedCount = serverCrashProcMetrics.getSubmittedCounter().getCount(); serverCrashFailedCount = serverCrashProcMetrics.getFailedCounter().getCount(); } + + @Parameterized.Parameters + public static Collection coordinatedByZK() { + return Arrays.asList(false, true); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java new file mode 100644 index 0000000..ad619fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestSplitWALProcedure.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.procedure; + +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + +import java.util.List; +import java.util.Optional; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({ MasterTests.class, MediumTests.class }) +public class TestSplitWALProcedure { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSplitWALProcedure.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestSplitWALProcedure.class); + + private static HBaseTestingUtility TEST_UTIL; + private HMaster master; + private TableName TABLE_NAME; + private MasterWalManager walManager; + private byte[] FAMILY; + + @Before + public void setup() throws Exception { + TEST_UTIL = new HBaseTestingUtility(); + TEST_UTIL.getConfiguration().setBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, false); + TEST_UTIL.getConfiguration().setInt("hbase.regionserver.wal.max.splitters", 1); + TEST_UTIL.startMiniCluster(3); + master = TEST_UTIL.getHBaseCluster().getMaster(); + walManager = master.getMasterWalManager(); + TABLE_NAME = TableName.valueOf(Bytes.toBytes("TestSplitWALProcedure")); + FAMILY = Bytes.toBytes("test"); + } + + @After + public void teardown() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testHandleDeadWorker() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = walManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString()); + ProcedureExecutor masterPE = master.getMasterProcedureExecutor(); + long pid = ProcedureTestingUtility.submitProcedure(masterPE, splitWALProcedure, + HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + TEST_UTIL.getHBaseCluster().killRegionServer(splitWALProcedure.getWorker()); + ProcedureTestingUtility.waitProcedure(masterPE, pid); + Assert.assertTrue(splitWALProcedure.isSuccess()); + } + + @Test + public void testMasterRestart() throws Exception { + Table table = TEST_UTIL.createTable(TABLE_NAME, FAMILY, TEST_UTIL.KEYS_FOR_HBA_CREATE_TABLE); + for (int i = 0; i < 100; i++) { + TEST_UTIL.loadTable(table, FAMILY); + } + HRegionServer testServer = TEST_UTIL.getHBaseCluster().getRegionServer(0); + List wals = walManager.getWALsToSplit(testServer.getServerName(), false); + Assert.assertEquals(1, wals.size()); + SplitWALProcedure splitWALProcedure = + new SplitWALProcedure(wals.get(0).getPath().toString()); + long pid = ProcedureTestingUtility.submitProcedure(master.getMasterProcedureExecutor(), + splitWALProcedure, HConstants.NO_NONCE, HConstants.NO_NONCE); + TEST_UTIL.waitFor(5000, () -> splitWALProcedure.getWorker() != null); + // Kill master + TEST_UTIL.getHBaseCluster().killMaster(master.getServerName()); + TEST_UTIL.getHBaseCluster().waitForMasterToStop(master.getServerName(), 20000); + // restart master + TEST_UTIL.getHBaseCluster().startMaster(); + TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster(); + this.master = TEST_UTIL.getHBaseCluster().getMaster(); + ProcedureTestingUtility.waitProcedure(master.getMasterProcedureExecutor(), pid); + Optional> procedure = + master.getProcedures().stream().filter(p -> p.getProcId() == pid).findAny(); + // make sure procedure is successful and wal is deleted + Assert.assertTrue(procedure.isPresent()); + Assert.assertTrue(procedure.get().isSuccess()); + Assert.assertFalse(TEST_UTIL.getTestFileSystem().exists(wals.get(0).getPath())); + } + +} -- 2.7.4