From 494fdacc395ddbc7656edcbe502ccd04341f8ac0 Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Fri, 2 Mar 2018 14:56:49 +0800 Subject: [PATCH] HBASE-20095 Redesign single instance pool in CleanerChore --- .../org/apache/hadoop/hbase/master/HMaster.java | 39 +++++++++-------- .../hadoop/hbase/master/cleaner/CleanerChore.java | 50 +++++++++++----------- .../example/TestZooKeeperTableArchiveClient.java | 4 ++ .../hbase/master/cleaner/TestCleanerChore.java | 6 +++ .../hbase/master/cleaner/TestHFileCleaner.java | 1 + .../hbase/master/cleaner/TestHFileLinkCleaner.java | 1 + .../hbase/master/cleaner/TestLogsCleaner.java | 1 + 7 files changed, 60 insertions(+), 42 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index b0dd0b40e9..a3bdc2c622 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -107,6 +107,7 @@ import org.apache.hadoop.hbase.master.balancer.BalancerChore; import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.master.cleaner.LogCleaner; import org.apache.hadoop.hbase.master.locking.LockManager; @@ -1124,35 +1125,37 @@ public class HMaster extends HRegionServer implements MasterServices { * need to install an unexpected exception handler. */ private void startServiceThreads() throws IOException{ - // Start the executor service pools - this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, + // Start the executor service pools + this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt("hbase.master.executor.openregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, + this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt("hbase.master.executor.closeregion.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, + this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, + this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS, conf.getInt("hbase.master.executor.meta.serverops.threads", 5)); - this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, + this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt("hbase.master.executor.logreplayops.threads", 10)); - // We depend on there being only one instance of this executor running - // at a time. To do concurrency, would need fencing of enable/disable of - // tables. - // Any time changing this maxThreads to > 1, pls see the comment at - // AccessController#postCompletedCreateTableAction - this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); - startProcedureExecutor(); - - // Start log cleaner thread - int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); - this.logCleaner = + // We depend on there being only one instance of this executor running + // at a time. To do concurrency, would need fencing of enable/disable of + // tables. + // Any time changing this maxThreads to > 1, pls see the comment at + // AccessController#postCompletedCreateTableAction + this.executorService.startExecutorService(ExecutorType.MASTER_TABLE_OPERATIONS, 1); + startProcedureExecutor(); + + // Initial cleaner chore + CleanerChore.initChorePool(conf); + // Start log cleaner thread. + int cleanerInterval = conf.getInt("hbase.master.cleaner.interval", 600 * 1000); + this.logCleaner = new LogCleaner(cleanerInterval, this, conf, getMasterWalManager().getFileSystem(), getMasterWalManager().getOldLogDir()); getChoreService().scheduleChore(logCleaner); - //start the hfile archive cleaner thread + //start the hfile archive cleaner thread Path archiveDir = HFileArchiveUtil.getArchivePath(conf); Map params = new HashMap<>(); params.put(MASTER, this); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index fdf5141734..763c5d24fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.master.cleaner; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; @@ -50,8 +51,6 @@ import org.slf4j.LoggerFactory; * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration */ -@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD", - justification="TODO: Fix. It is wonky have static pool initialized from instance") public abstract class CleanerChore extends ScheduledChore implements ConfigurationObserver { @@ -69,8 +68,8 @@ public abstract class CleanerChore extends Schedu // It may be waste resources for each cleaner chore own its pool, // so let's make pool for all cleaner chores. - private static volatile ForkJoinPool CHOREPOOL; - private static volatile int CHOREPOOLSIZE; + private static volatile ForkJoinPool pool; + private static volatile int poolSize; protected final FileSystem fs; private final Path oldFileDir; @@ -80,6 +79,16 @@ public abstract class CleanerChore extends Schedu private final AtomicBoolean reconfig = new AtomicBoolean(false); protected List cleanersChain; + public static void initChorePool(Configuration conf) { + String size = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); + poolSize = calculatePoolSize(size); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + poolSize = poolSize == 0 ? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE) : poolSize; + pool = new ForkJoinPool(poolSize); + LOG.info("Cleaner pool size is {}", poolSize); + } + public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey) { this(name, sleepPeriod, s, conf, fs, oldFileDir, confKey, null); @@ -98,21 +107,14 @@ public abstract class CleanerChore extends Schedu public CleanerChore(String name, final int sleepPeriod, final Stoppable s, Configuration conf, FileSystem fs, Path oldFileDir, String confKey, Map params) { super(name, s, sleepPeriod); + + Preconditions.checkNotNull(pool, "Chore's pool isn't initialized, please call" + + "CleanerChore.initChorePool(Configuration) before new a cleaner chore."); this.fs = fs; this.oldFileDir = oldFileDir; this.conf = conf; this.params = params; initCleanerChain(confKey); - - if (CHOREPOOL == null) { - String poolSize = conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE); - CHOREPOOLSIZE = calculatePoolSize(poolSize); - // poolSize may be 0 or 0.0 from a careless configuration, - // double check to make sure. - CHOREPOOLSIZE = CHOREPOOLSIZE == 0? calculatePoolSize(DEFAULT_CHORE_POOL_SIZE): CHOREPOOLSIZE; - this.CHOREPOOL = new ForkJoinPool(CHOREPOOLSIZE); - LOG.info("Cleaner pool size is {}", CHOREPOOLSIZE); - } } /** @@ -174,12 +176,12 @@ public abstract class CleanerChore extends Schedu @Override public void onConfigurationChange(Configuration conf) { int updatedSize = calculatePoolSize(conf.get(CHORE_POOL_SIZE, DEFAULT_CHORE_POOL_SIZE)); - if (updatedSize == CHOREPOOLSIZE) { + if (updatedSize == poolSize) { LOG.trace("Size from configuration is same as previous={}, no need to update.", updatedSize); return; } - CHOREPOOLSIZE = updatedSize; - if (CHOREPOOL.getPoolSize() == 0) { + poolSize = updatedSize; + if (pool.getPoolSize() == 0) { // Chore does not work now, update it directly. updateChorePoolSize(updatedSize); return; @@ -188,10 +190,10 @@ public abstract class CleanerChore extends Schedu reconfig.set(true); } - private void updateChorePoolSize(int updatedSize) { - CHOREPOOL.shutdownNow(); - LOG.info("Update chore's pool size from {} to {}", CHOREPOOL.getParallelism(), updatedSize); - CHOREPOOL = new ForkJoinPool(updatedSize); + private static void updateChorePoolSize(int updatedSize) { + pool.shutdownNow(); + LOG.info("Update chore's pool size from {} to {}", pool.getParallelism(), updatedSize); + pool = new ForkJoinPool(updatedSize); } /** @@ -227,7 +229,7 @@ public abstract class CleanerChore extends Schedu } // After each clean chore, checks if receives reconfigure notification while cleaning if (reconfig.compareAndSet(true, false)) { - updateChorePoolSize(CHOREPOOLSIZE); + updateChorePoolSize(poolSize); } } else { LOG.debug("Cleaner chore disabled! Not cleaning."); @@ -241,7 +243,7 @@ public abstract class CleanerChore extends Schedu public Boolean runCleaner() { preRunCleaner(); CleanerTask task = new CleanerTask(this.oldFileDir, true); - CHOREPOOL.submit(task); + pool.submit(task); return task.join(); } @@ -373,7 +375,7 @@ public abstract class CleanerChore extends Schedu @VisibleForTesting int getChorePoolSize() { - return CHOREPOOLSIZE; + return poolSize; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index f3e193e1f4..a47c16de3f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; +import org.apache.hadoop.hbase.master.cleaner.CleanerChore; import org.apache.hadoop.hbase.master.cleaner.HFileCleaner; import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -66,6 +67,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import sun.misc.Cleaner; /** * Spin up a small cluster and check that the hfiles of region are properly long-term archived as @@ -175,6 +177,7 @@ public class TestZooKeeperTableArchiveClient { Configuration conf = UTIL.getConfiguration(); // setup the delegate Stoppable stop = new StoppableImplementation(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); @@ -229,6 +232,7 @@ public class TestZooKeeperTableArchiveClient { // setup the delegate Stoppable stop = new StoppableImplementation(); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = setupAndCreateCleaner(conf, fs, archiveDir, stop); List cleaners = turnOnArchiving(STRING_TABLE_NAME, cleaner); final LongTermArchivingHFileCleaner delegate = (LongTermArchivingHFileCleaner) cleaners.get(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 22fa292a21..9438319187 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.StoppableImplementation; import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -55,6 +56,11 @@ public class TestCleanerChore { private static final Logger LOG = LoggerFactory.getLogger(TestCleanerChore.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + @Before + public void setup() throws Exception { + CleanerChore.initChorePool(UTIL.getConfiguration()); + } + @After public void cleanup() throws Exception { // delete and recreate the test directory, ensuring a clean test dir between tests diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 32480ea1bb..465e1932a9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -68,6 +68,7 @@ public class TestHFileCleaner { public static void setupCluster() throws Exception { // have to use a minidfs cluster because the localfs doesn't modify file times correctly UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(UTIL.getConfiguration()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java index 667a33e893..c011ea8da7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileLinkCleaner.java @@ -108,6 +108,7 @@ public class TestHFileLinkCleaner { final long ttl = 1000; conf.setLong(TimeToLiveHFileCleaner.TTL_CONF_KEY, ttl); Server server = new DummyServer(); + CleanerChore.initChorePool(conf); HFileCleaner cleaner = new HFileCleaner(1000, server, conf, fs, archiveDir); // Link backref cannot be removed diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 7423d26933..0263085aec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -85,6 +85,7 @@ public class TestLogsCleaner { public static void setUpBeforeClass() throws Exception { TEST_UTIL.startMiniZKCluster(); TEST_UTIL.startMiniDFSCluster(1); + CleanerChore.initChorePool(TEST_UTIL.getConfiguration()); } @AfterClass -- 2.15.0