From 8aebdea72d3204c972a793a6b20fe47e59dd8eef Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 12 Jul 2019 12:26:00 -0700 Subject: [PATCH] HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to pick up all of the split work it is capable of --- .../ZkSplitLogWorkerCoordination.java | 70 ++++++++----------- .../regionserver/TestSplitLogWorker.java | 47 ------------- 2 files changed, 30 insertions(+), 87 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 7e6708ed1f..8a6a9fddf6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -207,26 +207,26 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements *

* @param path zk node for the task */ - private void grabTask(String path) { + private boolean grabTask(String path) { Stat stat = new Stat(); byte[] data; synchronized (grabTaskLock) { currentTask = path; workerInGrabTask = true; if (Thread.interrupted()) { - return; + return false; } } try { try { if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) { SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet(); - return; + return false; } } catch (KeeperException e) { LOG.warn("Failed to get data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; + return false; } SplitLogTask slt; try { @@ -234,11 +234,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } catch (DeserializationException e) { LOG.warn("Failed parse data for znode " + path, e); SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet(); - return; + return false; } if (!slt.isUnassigned()) { SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet(); - return; + return false; } currentVersion = @@ -246,7 +246,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements slt.getMode(), stat.getVersion()); if (currentVersion < 0) { SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet(); - return; + return false; } if (ZKSplitLog.isRescanNode(watcher, currentTask)) { @@ -257,7 +257,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()), SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails); - return; + return false; } LOG.info("worker " + server.getServerName() + " acquired task " + path); @@ -274,6 +274,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements LOG.warn("Interrupted while yielding for other region servers", e); Thread.currentThread().interrupt(); } + return true; } finally { synchronized (grabTaskLock) { workerInGrabTask = false; @@ -324,29 +325,8 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements server.getExecutorService().submit(hsh); } - /** - * This function calculates how many splitters it could create based on expected average tasks per - * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks current total number of available tasks - */ - private int calculateAvailableSplitters(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List regionServers = - ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one - // calculate how many more splitters we could spawn - return Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get(); + private boolean areSplittersAvailable() { + return maxConcurrentTasks - tasksInProgress.get() > 0; } /** @@ -424,21 +404,31 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } } int numTasks = paths.size(); + boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { - int idx = (i + offset) % paths.size(); - // don't call ZKSplitLog.getNodeName() because that will lead to - // double encoding of the path name - if (this.calculateAvailableSplitters(numTasks) > 0) { - grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); - } else { - LOG.debug("Current region server " + server.getServerName() + " has " - + this.tasksInProgress.get() + " tasks in progress and can't take more."); - break; + while (!shouldStop) { + if (this.areSplittersAvailable()) { + LOG.debug("Current region server " + server.getServerName() + + " is ready to take more tasks, will get task list and try grab tasks again."); + int idx = (i + offset) % paths.size(); + // don't call ZKSplitLog.getNodeName() because that will lead to + // double encoding of the path name + taskGrabbed |= grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx))); + break; + } else { + LOG.debug("Current region server " + server.getServerName() + " has " + + this.tasksInProgress.get() + " tasks in progress and can't take more."); + Thread.sleep(100); + } } if (shouldStop) { return; } } + if (!taskGrabbed && !shouldStop) { + // do not grab any tasks, sleep a little bit to reduce zk request. + Thread.sleep(1000); + } SplitLogCounters.tot_wkr_task_grabing.incrementAndGet(); synchronized (taskReadyLock) { while (seq_start == taskReadySeq.get()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 48be76b23a..e5f41ec439 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -455,53 +455,6 @@ public class TestSplitLogWorker { } } - /** - * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per - * RS - * @throws Exception - */ - @Test(timeout=60000) - public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { - LOG.info("testAcquireMultiTasks"); - SplitLogCounters.resetCounters(); - final String TATAS = "tatas"; - final ServerName RS = ServerName.valueOf("rs,1,1"); - final ServerName RS2 = ServerName.valueOf("rs,1,2"); - final int maxTasks = 3; - Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); - RegionServerServices mockedRS = getRegionServer(RS); - - // create two RS nodes - String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - - for (int i = 0; i < maxTasks; i++) { - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); - slw.start(); - try { - int acquiredTasks = 0; - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME); - for (int i = 0; i < maxTasks; i++) { - byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); - SplitLogTask slt = SplitLogTask.parseFrom(bytes); - if (slt.isOwned(RS)) { - acquiredTasks++; - } - } - assertEquals(2, acquiredTasks); - } finally { - stopSplitLogWorker(slw); - } - } - /** * Create a mocked region server service instance * @param server -- 2.22.0