From ff75dd4f320fd3a5ebb763c00157324270b26016 Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Fri, 30 Nov 2018 18:51:09 +0800 Subject: [PATCH] HBASE-20610 Procedure V2 - Distributed Log Splitting --- .../src/main/protobuf/Admin.proto | 1 + .../src/main/protobuf/MasterProcedure.proto | 15 ++ .../hadoop/hbase/CoordinatedStateManager.java | 4 + .../java/org/apache/hadoop/hbase/SplitLogTask.java | 4 + .../ProcedureSplitLogWorkerCoordination.java | 128 ++++++++++++++++ .../coordination/ZkCoordinatedStateManager.java | 8 + .../apache/hadoop/hbase/executor/EventType.java | 10 +- .../apache/hadoop/hbase/executor/ExecutorType.java | 3 +- .../org/apache/hadoop/hbase/master/HMaster.java | 26 ++++ .../apache/hadoop/hbase/master/MasterServices.java | 2 + .../hadoop/hbase/master/MasterWalManager.java | 44 ++++++ .../hbase/master/RoundRobinSplitWalBanlancer.java | 45 ++++++ .../hadoop/hbase/master/SplitLogManager.java | 72 ++++++++- .../hadoop/hbase/master/SplitWalBalancer.java | 15 ++ .../master/procedure/RSProcedureDispatcher.java | 1 + .../master/procedure/ServerCrashProcedure.java | 70 ++++++++- .../master/procedure/ServerProcedureInterface.java | 2 +- .../hadoop/hbase/master/procedure/ServerQueue.java | 2 + .../master/procedure/SplittingOneWalProcedure.java | 167 +++++++++++++++++++++ .../hadoop/hbase/regionserver/HRegionServer.java | 2 + .../hadoop/hbase/regionserver/SplitLogWorker.java | 6 +- .../hbase/regionserver/SplittingLogCallable.java | 71 +++++++++ 22 files changed, 686 insertions(+), 12 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ProcedureSplitLogWorkerCoordination.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/RoundRobinSplitWalBanlancer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWalBalancer.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplittingOneWalProcedure.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplittingLogCallable.java diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto index c622d58..f46b59a 100644 --- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto +++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto @@ -29,6 +29,7 @@ import "ClusterStatus.proto"; import "HBase.proto"; import "WAL.proto"; import "Quota.proto"; +import "ZooKeeper.proto"; message GetRegionInfoRequest { required RegionSpecifier region = 1; diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index 44ac952..cbd8348d 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -308,6 +308,8 @@ enum ServerCrashState { SERVER_CRASH_WAIT_ON_ASSIGN = 9; SERVER_CRASH_SPLIT_META_LOGS = 10; SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_SPLITTING_META_CHECK=12; + SERVER_CRASH_SPLITTING_CHECK=13; SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -550,3 +552,16 @@ message OpenRegionProcedureStateData { message CloseRegionProcedureStateData { optional ServerName assign_candidate = 1; } + +message SplittingLogParameter { + required ServerName serverName = 1; + required string logPath = 2; +} + + +message SplittingOneWalData{ + required ServerName serverName = 1; + required string log = 2; + required bool dispatch = 3; + required bool isMetaWal = 4; +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java index ab146e7..3a2f3ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/CoordinatedStateManager.java @@ -43,6 +43,8 @@ public interface CoordinatedStateManager { */ SplitLogWorkerCoordination getSplitLogWorkerCoordination(); + SplitLogWorkerCoordination getProcedureSplitLogWorkerCoordinator(); + /** * Method to retrieve coordination for split log manager */ @@ -58,4 +60,6 @@ public interface CoordinatedStateManager { */ ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException; + + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java index dd4eb93..5984409 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/SplitLogTask.java @@ -121,6 +121,10 @@ public class SplitLogTask { return this.state == ZooKeeperProtos.SplitLogTask.State.ERR; } + public ZooKeeperProtos.SplitLogTask.State getState(){ + return this.state; + } + @Override public String toString() { return this.state.toString() + " " + this.originServer.toString(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ProcedureSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ProcedureSplitLogWorkerCoordination.java new file mode 100644 index 0000000..63e9aaa --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ProcedureSplitLogWorkerCoordination.java @@ -0,0 +1,128 @@ +package org.apache.hadoop.hbase.coordination; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.SplitLogTask; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; +import org.apache.hadoop.hbase.regionserver.SplittingLogCallable; +import org.apache.hadoop.hbase.regionserver.handler.WALSplitterHandler; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class ProcedureSplitLogWorkerCoordination implements SplitLogWorkerCoordination{ + private static final Logger LOG = LoggerFactory.getLogger(ProcedureSplitLogWorkerCoordination.class); + private SplitLogWorker.TaskExecutor splitTaskExecutor; + private SplitLogWorker worker; + private RegionServerServices server; + private int maxConcurrentTasks = 0; + private BlockingQueue tasks = new LinkedBlockingQueue<>(); + private ConcurrentHashMap tasksMap = new ConcurrentHashMap<>(); + private AtomicBoolean shouldStop = new AtomicBoolean(false); + private AtomicInteger processedTaskCount = new AtomicInteger(0); + private AtomicInteger tasksInProgress = new AtomicInteger(0); + private SplittingLogCallable callable = null; + + @Override + public void init(RegionServerServices server, Configuration conf, SplitLogWorker.TaskExecutor splitTaskExecutor, SplitLogWorker worker) { + this.server = server; + this.splitTaskExecutor = splitTaskExecutor; + this.worker = worker; + this.maxConcurrentTasks = conf.getInt("hbase.regionserver.wal.max.splitters", DEFAULT_MAX_SPLITTERS); + } + + @Override + public void stopProcessingTasks() { + shouldStop.set(true); + } + + @Override + public boolean isStop() { + return false; + } + + @Override + public void taskLoop() throws InterruptedException { + while (!shouldStop.get()) { + String task = tasks.poll(5000, TimeUnit.MILLISECONDS); + LOG.info("grab a task {}, continue to processing", task); + if(task != null){ + LOG.info("grab a task {}, continue to processing", task); + processLogSplittingTask(new ProcedureSplitTaskDetails(task)); + } + } + } + + private void processLogSplittingTask(ProcedureSplitTaskDetails details){ + processedTaskCount.getAndIncrement(); + WALSplitterHandler hsh = + new WALSplitterHandler(server, this, details, null, + this.tasksInProgress, splitTaskExecutor); + server.getExecutorService().submit(hsh); + } + + @Override + public void markCorrupted(Path rootDir, String name, FileSystem fs) { + + } + + @Override + public boolean isReady() throws InterruptedException { + return true; + } + + @Override + public int getTaskReadySeq() { + return 0; + } + + @Override + public void registerListener() { + + } + + @Override + public void removeListener() { + + } + + @Override + public void endTask(SplitLogTask slt, LongAdder ctr, SplitTaskDetails splitTaskDetails) { + //send a signal to SplittingLogCallable + String task = splitTaskDetails.getWALFile(); + SplittingLogCallable callable = tasksMap.get(task); + callable.finish(); + tasksMap.remove(task); + LOG.info("task {} removed from tasksmap", task); + } + + public void addTask(String task, SplittingLogCallable splittingLogCallable) throws InterruptedException { + this.tasksMap.put(task, splittingLogCallable); + this.tasks.put(task); + LOG.info("put the task {} into queue"); + } + + public static class ProcedureSplitTaskDetails implements SplitTaskDetails{ + private String log; + public ProcedureSplitTaskDetails(String log){ + this.log = log; + } + @Override + public String getWALFile() { + return log; + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java index 91b617f..7d219dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkCoordinatedStateManager.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.hbase.CoordinatedStateManager; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.regionserver.SplitLogWorker; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs; @@ -38,12 +39,14 @@ public class ZkCoordinatedStateManager implements CoordinatedStateManager { protected ZKWatcher watcher; protected SplitLogWorkerCoordination splitLogWorkerCoordination; protected SplitLogManagerCoordination splitLogManagerCoordination; + protected SplitLogWorkerCoordination procedureSplitLogWorkerCoordinator; public ZkCoordinatedStateManager(Server server) { this.watcher = server.getZooKeeper(); splitLogWorkerCoordination = new ZkSplitLogWorkerCoordination(server.getServerName(), watcher); splitLogManagerCoordination = new ZKSplitLogManagerCoordination(server.getConfiguration(), watcher); + procedureSplitLogWorkerCoordinator = new ProcedureSplitLogWorkerCoordination(); } @Override @@ -52,6 +55,11 @@ public class ZkCoordinatedStateManager implements CoordinatedStateManager { } @Override + public SplitLogWorkerCoordination getProcedureSplitLogWorkerCoordinator() { + return this.procedureSplitLogWorkerCoordinator; + } + + @Override public SplitLogManagerCoordination getSplitLogManagerCoordination() { return splitLogManagerCoordination; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java index ad38d1c..202b27a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/EventType.java @@ -288,7 +288,15 @@ public enum EventType { * * RS_REPLAY_SYNC_REPLICATION_WAL */ - RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL); + RS_REPLAY_SYNC_REPLICATION_WAL(85, ExecutorType.RS_REPLAY_SYNC_REPLICATION_WAL), + + /** + * RS split wal that under splitting dirs + * + * RS_LOG_SPLITTING + */ + RS_LOG_SPLITTING(86, ExecutorType.RS_LOG_SPLITTING); + ; private final int code; private final ExecutorType executor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java index ea97354..9555a82 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/executor/ExecutorType.java @@ -48,7 +48,8 @@ public enum ExecutorType { RS_COMPACTED_FILES_DISCHARGER (29), RS_OPEN_PRIORITY_REGION (30), RS_REFRESH_PEER(31), - RS_REPLAY_SYNC_REPLICATION_WAL(32); + RS_REPLAY_SYNC_REPLICATION_WAL(32), + RS_LOG_SPLITTING(33); ExecutorType(int value) { } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 132e271..99e3e69 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -35,6 +35,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -42,8 +43,12 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -55,6 +60,7 @@ import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.math3.geometry.spherical.oned.ArcsSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ChoreService; @@ -89,6 +95,7 @@ import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; +import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.favored.FavoredNodesPromoter; @@ -440,6 +447,12 @@ public class HMaster extends HRegionServer implements MasterServices { private final boolean maintenanceMode; static final String MAINTENANCE_MODE = "hbase.master.maintenance_mode"; + private SplitWalBalancer splitWalBalancer; + + ScheduledExecutorService splittingStatusChecker = Executors.newSingleThreadScheduledExecutor(); + Map> splittingStatusMap = new ConcurrentHashMap<>(); + + public static class RedirectServlet extends HttpServlet { private static final long serialVersionUID = 2894774810058302473L; private final int regionServerInfoPort; @@ -1855,6 +1868,19 @@ public class HMaster extends HRegionServer implements MasterServices { + "/" + RpcServer.getRemoteAddress().orElse(null); } + @Override + public SplitWalBalancer getSplitWalBalancer() { + //Singleton + if(this.splitWalBalancer == null){ + synchronized(this){ + if(splitWalBalancer == null){ + this.splitWalBalancer = new RoundRobinSplitWalBanlancer(this); + } + } + } + return this.splitWalBalancer; + } + /** * Switch for the background CatalogJanitor thread. * Used for testing. The thread will continue to run. It will just be a noop diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 27ef5f8..12454a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -510,4 +510,6 @@ public interface MasterServices extends Server { * @return True if cluster is up; false if cluster is not up (we are shutting down). */ boolean isClusterUp(); + + SplitWalBalancer getSplitWalBalancer(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 2b1a81f..b838c1e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -20,14 +20,18 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import java.util.stream.Stream; + +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -35,6 +39,9 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; +import org.apache.hadoop.hbase.master.procedure.SplittingOneWalProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; @@ -356,4 +363,41 @@ public class MasterWalManager { } } } + + public Procedure[] splitLogsByProcedure(ServerName serverName, boolean splitMeta, SplitWalBalancer splitLogBalancer){ + List logSplittingProcedures = null; + try { + // 1. list all splitting files + List splittingFiles = getAllWalsToSplit(serverName, splitMeta); + // 2. create corresponding procedureop[] + logSplittingProcedures = splitLogManager.createSplitOneWalProcedures(splittingFiles, splitLogBalancer, splitMeta); + } catch (IOException e) { + LOG.error("catch a IOException when create procedures for splitting logs of server {}", + serverName, e); + // TODO how to deal with IOException? Abort? + } + + return logSplittingProcedures == null + ? new Procedure[0]: logSplittingProcedures.toArray(new Procedure[logSplittingProcedures.size()]); + } + + + //TODO add a split log function which returns procedures + + public List getAllWalsToSplit(ServerName serverName, boolean splitMeta) throws IOException{ + List logDirs = getLogDirs(Collections.singleton(serverName)); + LOG.info("size of Log dir is {}", logDirs.size()); + logDirs.stream().forEach(log -> LOG.info("get a log dir: {} ", log)); + FileStatus[] fileStatuses = + SplitLogManager.getFileList(this.conf, logDirs, splitMeta ? META_FILTER : NON_META_FILTER); + LOG.info("size of logs is {}", fileStatuses.length); + return Lists.newArrayList(fileStatuses); + } + + public Path getSplittingDir(ServerName serverName){ + Path logDir = new Path(this.rootDir, + AbstractFSWALProvider.getWALDirectoryName(serverName.toString())); + Path splitDir = logDir.suffix(AbstractFSWALProvider.SPLITTING_EXT); + return splitDir; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RoundRobinSplitWalBanlancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RoundRobinSplitWalBanlancer.java new file mode 100644 index 0000000..0f1e412 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RoundRobinSplitWalBanlancer.java @@ -0,0 +1,45 @@ +package org.apache.hadoop.hbase.master; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class RoundRobinSplitWalBanlancer implements SplitWalBalancer { + private final MasterServices master; + private final AtomicInteger index = new AtomicInteger(0); + private static final int SOFT_UPPER_BOUND = 100000; + public RoundRobinSplitWalBanlancer(MasterServices master){ + this.master = master; + } + + + @Override + public Map getSplittingPlan(List fileStatuses) { + Map plan = new HashMap<>(); + List onlineServers = master.getServerManager().getOnlineServersList(); + int index= 0; + for(FileStatus file: fileStatuses){ + plan.put(file, onlineServers.get(index%onlineServers.size())); + index++; + } + return plan; + } + + @Override + public ServerName getSplittingTargetServer(String log) { + List onlineServers = master.getServerManager().getOnlineServersList(); + int currentIndex = index.getAndIncrement(); + if(currentIndex == SOFT_UPPER_BOUND){ + index.set(0); + } + return onlineServers.get(currentIndex%onlineServers.size()); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java index 4d977d3..c729c10 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitLogManager.java @@ -49,8 +49,10 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination; import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination.SplitLogManagerDetails; import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.master.procedure.SplittingOneWalProcedure; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; @@ -105,7 +107,8 @@ public class SplitLogManager { @VisibleForTesting final ConcurrentMap tasks = new ConcurrentHashMap<>(); - private TimeoutMonitor timeoutMonitor; + private ConcurrentHashMap> taskMap = new ConcurrentHashMap<>(); + private ScheduledChore timeoutMonitor; private volatile Set deadWorkers = null; private final Object deadWorkersLock = new Object(); @@ -136,9 +139,14 @@ public class SplitLogManager { } this.unassignedTimeout = conf.getInt("hbase.splitlog.manager.unassigned.timeout", DEFAULT_UNASSIGNED_TIMEOUT); - this.timeoutMonitor = - new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), - master); + if(conf.getBoolean("hbase.split.useZK", true)) { + this.timeoutMonitor = + new TimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), + master); + } else { + this.timeoutMonitor = new ProcedureTimeoutMonitor(conf.getInt("hbase.splitlog.manager.timeoutmonitor.period", 1000), + master); + } choreService.scheduleChore(timeoutMonitor); } @@ -183,6 +191,20 @@ public class SplitLogManager { return fileStatus.toArray(a); } + public List createSplitOneWalProcedures(List splittingWals, + SplitWalBalancer splitLogBalancer, Boolean splitMeta) { + List logSplittingProcedures = new ArrayList<>(); + Map plan = splitLogBalancer.getSplittingPlan(splittingWals); + for (FileStatus splittingFile : splittingWals) { + SplittingOneWalProcedure procedure = new SplittingOneWalProcedure(plan.get(splittingFile), + splittingFile.getPath().toString(), splitMeta); + logSplittingProcedures.add(procedure); + taskMap.computeIfAbsent(procedure.getServerName(), serverName -> new HashSet<>()) + .add(procedure); + } + return logSplittingProcedures; + } + /** * @param logDir one region sever wal dir path in .logs * @throws IOException if there was an error while splitting any log file @@ -535,6 +557,48 @@ public class SplitLogManager { } } + private class ProcedureTimeoutMonitor extends ScheduledChore { + public ProcedureTimeoutMonitor(final int period, Stoppable stopper) { + super("SplitLogManager Procedure Timeout Monitor", stopper, period); + } + + @Override + protected void chore() { + Set localDeadWorkers; + + synchronized (deadWorkersLock) { + localDeadWorkers = deadWorkers; + deadWorkers = null; + } + + if (localDeadWorkers != null) { + for (ServerName deadWorker : localDeadWorkers) { + HashSet deadProcedures = taskMap.get(deadWorker); + deadProcedures.stream().forEach(deadProcedure -> failProcedure(deadProcedure, + new IOException("This server is dead, we need to submit the task to another one"))); + } + } + + // check if there is a task that exceeds the max time out. + for (HashSet taskSet : taskMap.values()) { + taskSet.stream().forEach(splittingOneWalProcedure -> { + LOG.info("elapse time of procedure {} is {}", splittingOneWalProcedure, + splittingOneWalProcedure.elapsedTime()); + if (splittingOneWalProcedure.elapsedTime() > 1000000) { + failProcedure(splittingOneWalProcedure, + new IOException("procedure spent time too long, region server may hang...")); + } + }); + } + + } + + private void failProcedure(SplittingOneWalProcedure procedure, IOException e) { + procedure.remoteCallFailed(server.getMasterProcedureExecutor().getEnvironment(), + procedure.getServerName(), e); + } + } + /** * Periodically checks all active tasks and resubmits the ones that have timed out */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWalBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWalBalancer.java new file mode 100644 index 0000000..380387e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWalBalancer.java @@ -0,0 +1,15 @@ +package org.apache.hadoop.hbase.master; + +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public interface SplitWalBalancer { + Map getSplittingPlan(List fileStatuses); + ServerName getSplittingTargetServer(String log); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java index 638f9d3..313e639 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/RSProcedureDispatcher.java @@ -24,6 +24,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.SplitLogTask; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java index 048bca8..81585be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerCrashProcedure.java @@ -21,6 +21,8 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; + +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; @@ -28,9 +30,11 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterWalManager; +import org.apache.hadoop.hbase.master.SplitWalBalancer; import org.apache.hadoop.hbase.master.assignment.AssignmentManager; import org.apache.hadoop.hbase.master.assignment.RegionStateNode; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; +import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureMetrics; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; @@ -137,8 +141,22 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_META_LOGS: - splitMetaLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + if(env.getMasterConfiguration().getBoolean("hbase.split.useZK", false)) { + splitMetaLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + //create child procedure for logs + addChildProcedure(createSplittingWalProcedures(env, true)); + setNextState(ServerCrashState.SERVER_CRASH_SPLITTING_META_CHECK); + } + break; + case SERVER_CRASH_SPLITTING_META_CHECK: + if(isSplittingDone(env, true)){ + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN_META); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_META_LOGS); + + } break; case SERVER_CRASH_ASSIGN_META: assignRegions(env, Arrays.asList(RegionInfoBuilder.FIRST_META_REGIONINFO)); @@ -156,8 +174,23 @@ public class ServerCrashProcedure } break; case SERVER_CRASH_SPLIT_LOGS: - splitLogs(env); - setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + if(env.getMasterConfiguration().getBoolean("hbase.split.useZK", false)) { + splitLogs(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + addChildProcedure(createSplittingWalProcedures(env, false)); + setNextState(ServerCrashState.SERVER_CRASH_SPLITTING_CHECK); + } + break; + case SERVER_CRASH_SPLITTING_CHECK: + //TODO + if(isSplittingDone(env, false)){ + //remove dir + cleanupSplittingDir(env); + setNextState(ServerCrashState.SERVER_CRASH_ASSIGN); + } else { + setNextState(ServerCrashState.SERVER_CRASH_SPLIT_LOGS); + } break; case SERVER_CRASH_ASSIGN: // If no regions to assign, skip assign and skip to the finish. @@ -191,6 +224,35 @@ public class ServerCrashProcedure return Flow.HAS_MORE_STATE; } + private void cleanupSplittingDir(MasterProcedureEnv env) { + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + Path splitDir = mwm.getSplittingDir(serverName); + try { + mwm.getFileSystem().delete(splitDir, false); + } catch (IOException e) { + LOG.error("delete dir of splitting {} failed, ignore", splitDir); + } + } + + private boolean isSplittingDone(MasterProcedureEnv env, boolean splitMeta) { + LOG.info("start to check if splitting finished"); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + try { + return mwm.getAllWalsToSplit(serverName, splitMeta).size() == 0; + } catch (IOException e) { + LOG.error("get filelist of serverName {} failed", serverName); + return false; + } + } + + private Procedure[] createSplittingWalProcedures(MasterProcedureEnv env, boolean splitMeta) { + LOG.info("Splitting WALs {}, isMeta: {}", this, splitMeta); + MasterWalManager mwm = env.getMasterServices().getMasterWalManager(); + SplitWalBalancer splitWalBalancer = env.getMasterServices().getSplitWalBalancer(); + Procedure[] procedures = mwm.splitLogsByProcedure(serverName, splitMeta, splitWalBalancer); + return procedures; + } + private boolean filterDefaultMetaRegions() { if (regionsOnCrashedServer == null) { return false; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java index f3c10ef..3693ef4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerProcedureInterface.java @@ -27,7 +27,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private public interface ServerProcedureInterface { public enum ServerOperationType { - CRASH_HANDLER + CRASH_HANDLER,WAL_SPLIT } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java index 3a1b3c4..197cf78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ServerQueue.java @@ -35,6 +35,8 @@ class ServerQueue extends Queue { switch (spi.getServerOperationType()) { case CRASH_HANDLER: return true; + case WAL_SPLIT: + return false; default: break; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplittingOneWalProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplittingOneWalProcedure.java new file mode 100644 index 0000000..0e43c34 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplittingOneWalProcedure.java @@ -0,0 +1,167 @@ +package org.apache.hadoop.hbase.master.procedure; + +import java.io.IOException; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.procedure2.NoNodeDispatchException; +import org.apache.hadoop.hbase.procedure2.NoServerDispatchException; +import org.apache.hadoop.hbase.procedure2.NullTargetServerDispatchException; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.procedure2.RemoteProcedureException; +import org.apache.hadoop.hbase.regionserver.SplittingLogCallable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@InterfaceAudience.Private +public class SplittingOneWalProcedure extends Procedure + implements RemoteProcedureDispatcher.RemoteProcedure, + ServerProcedureInterface { + private static final Logger LOG = LoggerFactory.getLogger(SplittingOneWalProcedure.class); + private String log; + private ServerName serverName; + private boolean dispatched; + private ProcedureEvent event; + private boolean isMetaWal; + private boolean succ = false; + + public SplittingOneWalProcedure() { + } + + public SplittingOneWalProcedure(ServerName serverName, String log, boolean isMetaWal) { + this.serverName = serverName; + this.log = log; + this.isMetaWal = isMetaWal; + } + + @Override + protected Procedure[] execute(MasterProcedureEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + if (dispatched) { + if(succ) { + return null; + } else { + this.serverName = null; + } + dispatched = false; + } + try { + if(serverName == null) { + this.serverName = env.getMasterServices().getSplitWalBalancer().getSplittingTargetServer(log); + } + env.getRemoteDispatcher().addOperationToNode(serverName, this); + } catch (NoNodeDispatchException | NullTargetServerDispatchException + | NoServerDispatchException e) { + LOG.error("dispatch log {} to server {} failed ", log, serverName, e); + return null; + } + dispatched = true; + event = new ProcedureEvent<>(this); + event.suspendIfNotReady(this); + throw new ProcedureSuspendedException(); + } + + @Override + protected void rollback(MasterProcedureEnv env) throws IOException, InterruptedException { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(MasterProcedureEnv env) { + return false; + } + + @Override + protected void serializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplittingOneWalData.Builder builder = + MasterProcedureProtos.SplittingOneWalData.newBuilder().setServerName(ProtobufUtil.toServerName(serverName)).setLog(log) + .setIsMetaWal(isMetaWal).setDispatch(dispatched); + serializer.serialize(builder.build()); + } + + @Override + protected void deserializeStateData(ProcedureStateSerializer serializer) throws IOException { + MasterProcedureProtos.SplittingOneWalData data = serializer.deserialize(MasterProcedureProtos.SplittingOneWalData.class); + serverName = ProtobufUtil.toServerName(data.getServerName()); + log = data.getLog(); + isMetaWal = data.getIsMetaWal(); + dispatched = data.getDispatch(); + } + + @Override + public RemoteProcedureDispatcher.RemoteOperation remoteCallBuild(MasterProcedureEnv env, + ServerName serverName) { + return new RSProcedureDispatcher.ServerOperation(this, getProcId(), SplittingLogCallable.class, + MasterProcedureProtos.SplittingLogParameter.newBuilder() + .setServerName(ProtobufUtil.toServerName(serverName)).setLogPath(log).build() + .toByteArray()); + } + + @Override + public void remoteCallFailed(MasterProcedureEnv env, ServerName serverName, + IOException exception) { + complete(env, exception); + } + + @Override + public void remoteOperationCompleted(MasterProcedureEnv env) { + complete(env, null); + } + + private void complete(MasterProcedureEnv env, Throwable error){ + if(error == null){ + LOG.info("remote {} split log {} success...", serverName, log); + FileSystem fs = env.getMasterFileSystem().getFileSystem(); + try { + fs.delete(new Path(log), false); + } catch (IOException e) { + LOG.error("remove wal {} failed...", log); + } + succ = true; + } else { + LOG.error("Log splitting failed {}, will retry on another server", error); + succ = false; + } + event.wake(env.getProcedureScheduler()); + event = null; + } + + @Override + public void remoteOperationFailed(MasterProcedureEnv env, RemoteProcedureException error) { + // should dispatch to another rs? + LOG.error("remote call failed for splitting log {} on server {}", log, serverName, error); + complete(env, error); + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public boolean hasMetaTableRegion() { + return isMetaWal; + } + + @Override + public ServerOperationType getServerOperationType() { + return ServerOperationType.WAL_SPLIT; + } + + + public void skip(MasterProcedureEnv env){ + succ = true; + event.wake(env.getProcedureScheduler()); + event = null; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index b9d606d..80eefa0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -626,6 +626,7 @@ public class HRegionServer extends HasThread implements rpcServices.isa.getPort(), this, canCreateBaseZNode()); // If no master in cluster, skip trying to track one or look for a cluster status. if (!this.masterless) { + //TODO what's this? this.csm = new ZkCoordinatedStateManager(this); masterAddressTracker = new MasterAddressTracker(getZooKeeper(), this); @@ -1957,6 +1958,7 @@ public class HRegionServer extends HasThread implements this.executorService.startExecutorService(ExecutorType.RS_PARALLEL_SEEK, conf.getInt("hbase.storescanner.parallel.seek.threads", 10)); } + this.executorService.startExecutorService(ExecutorType.RS_LOG_SPLITTING, 9); this.executorService.startExecutorService(ExecutorType.RS_LOG_REPLAY_OPS, conf.getInt( "hbase.regionserver.wal.max.splitters", SplitLogWorkerCoordination.DEFAULT_MAX_SPLITTERS)); // Start the threads for compacted files discharger diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java index 4a9712c..e127ff0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitLogWorker.java @@ -81,7 +81,11 @@ public class SplitLogWorker implements Runnable { public SplitLogWorker(Server hserver, Configuration conf, RegionServerServices server, TaskExecutor splitTaskExecutor) { this.server = server; - this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); + if(conf.getBoolean("hbase.split.useZK", true)) { + this.coordination = hserver.getCoordinatedStateManager().getSplitLogWorkerCoordination(); + } else { + this.coordination = hserver.getCoordinatedStateManager().getProcedureSplitLogWorkerCoordinator(); + } coordination.init(server, conf, splitTaskExecutor, this); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplittingLogCallable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplittingLogCallable.java new file mode 100644 index 0000000..7d7c6bb --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplittingLogCallable.java @@ -0,0 +1,71 @@ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.coordination.ProcedureSplitLogWorkerCoordination; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; +import org.apache.hadoop.hbase.executor.EventType; +import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; +import org.apache.hadoop.yarn.webapp.hamlet.Hamlet; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos; + +@InterfaceAudience.Private +public class SplittingLogCallable implements RSProcedureCallable { + private static final Logger LOG = LoggerFactory.getLogger(SplittingLogCallable.class); + + private String logPath; + private Exception initError; + private HRegionServer rs; + private AtomicBoolean isFinish = new AtomicBoolean(false); + + @Override + public void init(byte[] parameter, HRegionServer rs) { + try { + this.rs = rs; + MasterProcedureProtos.SplittingLogParameter param = MasterProcedureProtos.SplittingLogParameter.parseFrom(parameter); + if(!rs.getServerName().equals(ProtobufUtil.toServerName(param.getServerName()))){ + initError = new IOException("this request sent to a wrong target server"); + } + this.logPath = param.getLogPath(); + } catch (InvalidProtocolBufferException e) { + initError = e; + } + } + + @Override + public EventType getEventType() { + return EventType.RS_LOG_SPLITTING; + } + + @Override + public Void call() throws Exception { + if(initError != null){ + throw initError; + } + SplitLogWorkerCoordination coordinator = rs.getCoordinatedStateManager().getProcedureSplitLogWorkerCoordinator(); + assert coordinator instanceof ProcedureSplitLogWorkerCoordination; + ((ProcedureSplitLogWorkerCoordination) coordinator).addTask(logPath, this); + waitForComplete(); + return null; + } + + public void finish(){ + this.isFinish.set(true); + } + + private void waitForComplete() throws InterruptedException { + while (!isFinish.get()){ + LOG.info("splitting is still ongoing for log {}", this.logPath); + TimeUnit.MILLISECONDS.sleep(5000); + } + LOG.info("split log successed {}", this.logPath); + } +} -- 2.7.4