From 8bd7dedeb12e19c5ecbb77acaf1d6cef7ddb504b Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Fri, 12 Jul 2019 12:26:00 -0700 Subject: [PATCH] HBASE-22686 ZkSplitLogWorkerCoordination doesn't allow a regionserver to pick up all of the split work it is capable of --- .../ZkSplitLogWorkerCoordination.java | 31 ++---------- .../regionserver/TestSplitLogWorker.java | 47 ------------------- 2 files changed, 3 insertions(+), 75 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java index f22a62a8b8..e54df9f2b1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java @@ -328,34 +328,10 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } /** - * This function calculates how many splitters this RS should create based on expected average - * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration.
- * At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound) - * @param numTasks total number of split tasks available - * @return number of tasks this RS can grab - */ - private int getNumExpectedTasksPerRS(int numTasks) { - // at lease one RS(itself) available - int availableRSs = 1; - try { - List regionServers = - ZKUtil.listChildrenNoWatch(watcher, watcher.rsZNode); - availableRSs = Math.max(availableRSs, (regionServers == null) ? 0 : regionServers.size()); - } catch (KeeperException e) { - // do nothing - LOG.debug("getAvailableRegionServers got ZooKeeper exception", e); - } - int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1); - return Math.max(1, expectedTasksPerRS); // at least be one - } - - /** - * @param expectedTasksPerRS Average number of tasks to be handled by each RS * @return true if more splitters are available, otherwise false. */ - private boolean areSplittersAvailable(int expectedTasksPerRS) { - return (Math.min(expectedTasksPerRS, maxConcurrentTasks) - - this.tasksInProgress.get()) > 0; + private boolean areSplittersAvailable() { + return maxConcurrentTasks - tasksInProgress.get() > 0; } /** @@ -436,11 +412,10 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements } } int numTasks = paths.size(); - int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks); boolean taskGrabbed = false; for (int i = 0; i < numTasks; i++) { while (!shouldStop) { - if (this.areSplittersAvailable(expectedTasksPerRS)) { + if (this.areSplittersAvailable()) { LOG.debug("Current region server " + server.getServerName() + " is ready to take more tasks, will get task list and try grab tasks again."); int idx = (i + offset) % paths.size(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java index 48be76b23a..e5f41ec439 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitLogWorker.java @@ -455,53 +455,6 @@ public class TestSplitLogWorker { } } - /** - * The test checks SplitLogWorker should not spawn more splitters than expected num of tasks per - * RS - * @throws Exception - */ - @Test(timeout=60000) - public void testAcquireMultiTasksByAvgTasksPerRS() throws Exception { - LOG.info("testAcquireMultiTasks"); - SplitLogCounters.resetCounters(); - final String TATAS = "tatas"; - final ServerName RS = ServerName.valueOf("rs,1,1"); - final ServerName RS2 = ServerName.valueOf("rs,1,2"); - final int maxTasks = 3; - Configuration testConf = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); - testConf.setInt("hbase.regionserver.wal.max.splitters", maxTasks); - RegionServerServices mockedRS = getRegionServer(RS); - - // create two RS nodes - String rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - rsPath = ZKUtil.joinZNode(zkw.rsZNode, RS2.getServerName()); - zkw.getRecoverableZooKeeper().create(rsPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); - - for (int i = 0; i < maxTasks; i++) { - zkw.getRecoverableZooKeeper().create(ZKSplitLog.getEncodedNodeName(zkw, TATAS + i), - new SplitLogTask.Unassigned(ServerName.valueOf("mgr,1,1"), this.mode).toByteArray(), - Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); - } - - SplitLogWorker slw = new SplitLogWorker(ds, testConf, mockedRS, neverEndingTask); - slw.start(); - try { - int acquiredTasks = 0; - waitForCounter(SplitLogCounters.tot_wkr_task_acquired, 0, 2, WAIT_TIME); - for (int i = 0; i < maxTasks; i++) { - byte[] bytes = ZKUtil.getData(zkw, ZKSplitLog.getEncodedNodeName(zkw, TATAS + i)); - SplitLogTask slt = SplitLogTask.parseFrom(bytes); - if (slt.isOwned(RS)) { - acquiredTasks++; - } - } - assertEquals(2, acquiredTasks); - } finally { - stopSplitLogWorker(slw); - } - } - /** * Create a mocked region server service instance * @param server -- 2.22.0