diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index bc27991..28a2308 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -46,11 +46,24 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti public class LogCleaner extends CleanerChore { private static final Logger LOG = LoggerFactory.getLogger(LogCleaner.class.getName()); - public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; - public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; + public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; + + public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = + "hbase.oldwals.cleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000; + + public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = + "hbase.oldwals.cleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500; + private final LinkedBlockingQueue pendingDelete; private List oldWALsCleaner; + private long cleanerTimeoutMsec; + private long cleanerCheckIntervalMsec; /** * @param period the period of time to sleep between each run @@ -63,8 +76,12 @@ public class LogCleaner extends CleanerChore { Path oldLogDir) { super("LogsCleaner", period, stopper, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); this.pendingDelete = new LinkedBlockingQueue<>(); - int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); + this.cleanerTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + this.cleanerCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override @@ -77,7 +94,7 @@ public class LogCleaner extends CleanerChore { public void onConfigurationChange(Configuration conf) { super.onConfigurationChange(conf); - int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { if (LOG.isDebugEnabled()) { LOG.debug("Size from configuration is the same as previous which is " + @@ -93,7 +110,8 @@ public class LogCleaner extends CleanerChore { protected int deleteFiles(Iterable filesToDelete) { List results = new LinkedList<>(); for (FileStatus toDelete : filesToDelete) { - CleanerContext context = CleanerContext.createCleanerContext(toDelete); + CleanerContext context = CleanerContext.createCleanerContext(toDelete, + cleanerTimeoutMsec); if (context != null) { pendingDelete.add(context); results.add(context); @@ -102,7 +120,7 @@ public class LogCleaner extends CleanerChore { int deletedFiles = 0; for (CleanerContext res : results) { - deletedFiles += res.getResult(500) ? 1 : 0; + deletedFiles += res.getResult(cleanerCheckIntervalMsec) ? 1 : 0; } return deletedFiles; } @@ -118,6 +136,16 @@ public class LogCleaner extends CleanerChore { return oldWALsCleaner.size(); } + @VisibleForTesting + long getCleanerTimeoutMsec() { + return cleanerTimeoutMsec; + } + + @VisibleForTesting + long getCleanerCheckIntervalMsec() { + return cleanerCheckIntervalMsec; + } + private List createOldWalsCleaner(int size) { LOG.info("Creating OldWALs cleaners with size=" + size); @@ -186,20 +214,20 @@ public class LogCleaner extends CleanerChore { } private static final class CleanerContext { - // At most waits 60 seconds - static final long MAX_WAIT = 60 * 1000; final FileStatus target; volatile boolean result; volatile boolean setFromCleaner = false; + long timeoutMsec; - static CleanerContext createCleanerContext(FileStatus status) { - return status != null ? new CleanerContext(status) : null; + static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { + return status != null ? new CleanerContext(status, timeoutMsec) : null; } - private CleanerContext(FileStatus status) { + private CleanerContext(FileStatus status, long timeoutMsec) { this.target = status; this.result = false; + this.timeoutMsec = timeoutMsec; } synchronized void setResult(boolean res) { @@ -214,8 +242,8 @@ public class LogCleaner extends CleanerChore { while (!setFromCleaner) { wait(waitIfNotFinished); totalTime += waitIfNotFinished; - if (totalTime >= MAX_WAIT) { - LOG.warn("Spend too much time to delete oldwals " + target); + if (totalTime >= timeoutMsec) { + LOG.warn("Spend too much time {} ms to delete oldwals {}", totalTime, target); return result; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 0263085..9a44a60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -279,14 +279,23 @@ public class TestLogsCleaner { @Test public void testOnConfigurationChange() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); // Prepare environments Server server = new DummyServer(); Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); - assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + cleaner.getCleanerTimeoutMsec()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + cleaner.getCleanerCheckIntervalMsec()); // Create dir and files for test fs.delete(oldWALsDir, true); fs.mkdirs(oldWALsDir); @@ -300,7 +309,7 @@ public class TestLogsCleaner { thread.start(); // change size of cleaners dynamically int sizeToChange = 4; - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); cleaner.onConfigurationChange(conf); assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); // Stop chore