diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index ef87498..9fdb1ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coordination; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -199,27 +200,28 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements * try to grab a 'lock' on the task zk node to own and execute the task. *

* @param path zk node for the task + * @return boolean value when grab a task success return true otherwise false */ - private void grabTask(String path) { + private boolean grabTask(String path) { Stat stat = new Stat(); byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { - return; + return false; } } try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); - return; + return false; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return false; } SplitLogTask slt; try { @@ -227,18 +229,18 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return false; } if (!slt.isUnassigned()) { SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); - return; + return false; } currentVersion = attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); - return; + return false; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { @@ -249,7 +251,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements endTask(new SplitLogTask.Done(server.getServerName()), SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); - return; + return false; } LOG.info("worker " + server.getServerName() + " acquired task " + path); @@ -266,6 +268,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } + return true; } finally { synchronized (grabTaskLock) { workerInGrabTask = false; @@ -315,13 +318,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements server.getExecutorService().submit(hsh); } - /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks - */ - private int calculateAvailableSplitters(int numTasks) { + private int getAvailableRSs() { // at lease one RS(itself) available int availableRSs = 1; try { @@ -332,7 +329,17 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements // do nothing LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); } + return availableRSs; + } + /** + * This function calculates how many splitters it could create based on expected average tasks per + * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) + * @param availableRSs current total number of available regionservers + * @param numTasks current total number of available tasks + */ + private int calculateAvailableSplitters(int availableRSs, int numTasks) { int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one // calculate how many more splitters we could spawn @@ -406,8 +413,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + " ... worker thread exiting."); return; } + // shuffle the paths to prevent different split log worker start from the same log file after + // meta log (if any) + Collections.shuffle(paths); // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); + int offset = 0; for (int i = 0; i < paths.size(); i++) { if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { offset = i; @@ -415,21 +425,32 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } } int numTasks = paths.size(); + int availableRSs = getAvailableRSs(); + int taskGrabbed = 0; for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + server.getServerName() + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; + while (!shouldStop) { + if (this.calculateAvailableSplitters(availableRSs, numTasks) > 0) { + LOG.debug("Current region server " + server.getServerName() + + " is ready to take more tasks, will get task list and try grab tasks again."); + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + taskGrabbed += grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx))) ? 1 : 0; + break; + } else { + LOG.debug("Current region server " + server.getServerName() + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + Thread.sleep(100); + } } if (shouldStop) { return; } } + if (taskGrabbed == 0 && !shouldStop) { + // sleep a little bit to reduce zk request. + Thread.sleep(1000); + } SplitLogCounters.tot_wkr_task_grabing.increment(); synchronized (taskReadySeq) { while (seq_start == taskReadySeq.get()) {