diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index ef87498..e91c00a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.coordination; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; @@ -200,26 +201,26 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements *

* @param path zk node for the task */ - private void grabTask(String path) { + private int grabTask(String path) { Stat stat = new Stat(); byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { - return; + return 0; } } try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.increment(); - return; + return 0; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return 0; } SplitLogTask slt; try { @@ -227,18 +228,18 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.increment(); - return; + return 0; } if (!slt.isUnassigned()) { SplitLogCounters.tot_wkr_failed_to_grab_task_owned.increment(); - return; + return 0; } currentVersion = attemptToOwnTask(true, watcher, server.getServerName(), path, stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.increment(); - return; + return 0; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { @@ -249,7 +250,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements endTask(new SplitLogTask.Done(server.getServerName()), SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); - return; + return 0; } LOG.info("worker " + server.getServerName() + " acquired task " + path); @@ -266,6 +267,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } + return 1; } finally { synchronized (grabTaskLock) { workerInGrabTask = false; @@ -315,13 +317,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements server.getExecutorService().submit(hsh); } - /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks - */ - private int calculateAvailableSplitters(int numTasks) { + private int getAvailableRSs() { // at lease one RS(itself) available int availableRSs = 1; try { @@ -332,7 +328,16 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements // do nothing LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); } + return availableRSs; + } + /** + * This function calculates how many splitters it could create based on expected average tasks per + * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
+ * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) + * @param numTasks current total number of available tasks + */ + private int calculateAvailableSplitters(int availableRSs, int numTasks) { int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one // calculate how many more splitters we could spawn @@ -406,8 +411,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements + " ... worker thread exiting."); return; } + // shuffle the paths to prevent different split log worker start from the same log file after + // meta log (if any) + Collections.shuffle(paths); // pick meta wal firstly - int offset = (int) (Math.random() * paths.size()); + int offset = 0; for (int i = 0; i < paths.size(); i++) { if (AbstractFSWALProvider.isMetaFile(paths.get(i))) { offset = i; @@ -415,21 +423,35 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } } int numTasks = paths.size(); + int availableRSs = getAvailableRSs(); + int taskGrabed = 0; for (int i = 0; i < numTasks; i++) { int idx = (i + offset) % paths.size(); // don't call ZKSplitLog.getNodeName() because that will lead to // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx))); + if (this.calculateAvailableSplitters(availableRSs, numTasks) > 0) { + taskGrabed += grabTask(ZNodePaths.joinZNode(watcher.znodePaths.splitLogZNode, paths.get(idx))); } else { LOG.debug("Current region server " + server.getServerName() + " has " + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; + while (!shouldStop) { + Thread.sleep(1000); + if (this.calculateAvailableSplitters(availableRSs, numTasks) > 0) { + LOG.debug("Current region server " + server.getServerName() + + " is ready to take more tasks, will get task list and try grab tasks again."); + break; + } + } } if (shouldStop) { return; } } + if (taskGrabed == 0 && !shouldStop) { + // sleep a little bit to reduce zk request. + int sleepTime = RandomUtils.nextInt(0, 100) + 500; + Thread.sleep(sleepTime); + } SplitLogCounters.tot_wkr_task_grabing.increment(); synchronized (taskReadySeq) { while (seq_start == taskReadySeq.get()) {