From 5c0b3d9a65bdd945b620523bfa86c7e117f49b56 Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 12 Jul 2019 12:26:00 -0700 Subject: [PATCH] HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to pick up all of the split work it is capable of --- .../ZkSplitLogWorkerCoordination.java | 31 ++---------- .../regionserver/TestSplitLogWorker.java | 47 ------------------- 2 files changed, 3 insertions(+), 75 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index 7ceaaec362..8e196177fc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -324,34 +324,10 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements } /** - * This function calculates how many splitters this RS should create based on expected average - * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks total number of split tasks available - * @return number of tasks this RS can grab - */ - private int getNumExpectedTasksPerRS(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List regionServers = - ZKUtil.listChildrenNoWatch(watcher, watcher.getZNodePaths().rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - return Math.max(1, expectedTasksPerRS); // at least be one - } - - /** - * @param expectedTasksPerRS Average number of tasks to be handled by each RS * @return true if more splitters are available, otherwise false. */ - private boolean areSplittersAvailable(int expectedTasksPerRS) { - return (Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get()) > 0; + private boolean areSplittersAvailable() { + return maxConcurrentTasks - tasksInProgress.get() > 0; } /** @@ -432,11 +408,10 @@ public class ZkSplitLogWorkerCoordination extends ZKListener implements } } int numTasks = paths.size(); - int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { while (!shouldStop) { - if (this.areSplittersAvailable(expectedTasksPerRS)) { + if (this.areSplittersAvailable()) { LOG.debug("Current region server " + server.getServerName() + " is ready to take more tasks, will get task list and try grab tasks again."); int idx = (i + offset) % paths.size(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index b52bf1977e..2c52bc0c21 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -471,53 +471,6 @@ public class TestSplitLogWorker { } } - /** - * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per - * RS - * @throws Exception - */ - @Test - public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { - LOG.info("testAcquireMultiTasks"); - SplitLogCounters.resetCounters(); - final String TATAS = "tatas"; - final ServerName RS = ServerName.valueOf("rs,1,1"); - final ServerName RS2 = ServerName.valueOf("rs,1,2"); - final int maxTasks = 3; - Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt(HBASE_SPLIT_WAL_MAX_SPLITTER, maxTasks); - RegionServerServices mockedRS = getRegionServer(RS); - - // create two RS nodes - String rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - rsPath = ZNodePaths.joinZNode(zkw.getZNodePaths().rsZNode, RS2.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - - for (int i = 0; i < maxTasks; i++) { - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1")).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); - slw.start(); - try { - int acquiredTasks = 0; - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME); - for (int i = 0; i < maxTasks; i++) { - byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); - SplitLogTask slt = SplitLogTask.parseFrom(bytes); - if (slt.isOwned(RS)) { - acquiredTasks++; - } - } - assertEquals(2, acquiredTasks); - } finally { - stopSplitLogWorker(slw); - } - } - /** * Create a mocked region server service instance */ -- 2.22.0