diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index bc27991..59e2d64 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -49,8 +49,21 @@ public class LogCleaner extends CleanerChore { public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = + "hbase.oldwals.cleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_CLEANER_THREAD_MAX_WAIT_MSEC = 60 * 1000; + + public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = + "hbase.oldwals.cleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_CHECK_INTERVAL_MSEC = 500; + + private final LinkedBlockingQueue pendingDelete; private List oldWALsCleaner; + private long cleanerTimeoutMsec; + private long cleanerCheckIntervalMsec; /** * @param period the period of time to sleep between each run @@ -65,6 +78,10 @@ public class LogCleaner extends CleanerChore { this.pendingDelete = new LinkedBlockingQueue<>(); int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); + this.cleanerTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_CLEANER_THREAD_MAX_WAIT_MSEC); + this.cleanerCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_CHECK_INTERVAL_MSEC); } @Override @@ -93,7 +110,8 @@ public class LogCleaner extends CleanerChore { protected int deleteFiles(Iterable filesToDelete) { List results = new LinkedList<>(); for (FileStatus toDelete : filesToDelete) { - CleanerContext context = CleanerContext.createCleanerContext(toDelete); + CleanerContext context = CleanerContext.createCleanerContext(toDelete, + cleanerTimeoutMsec); if (context != null) { pendingDelete.add(context); results.add(context); @@ -102,7 +120,7 @@ public class LogCleaner extends CleanerChore { int deletedFiles = 0; for (CleanerContext res : results) { - deletedFiles += res.getResult(500) ? 1 : 0; + deletedFiles += res.getResult(cleanerCheckIntervalMsec) ? 1 : 0; } return deletedFiles; } @@ -118,6 +136,16 @@ public class LogCleaner extends CleanerChore { return oldWALsCleaner.size(); } + @VisibleForTesting + long getCleanerTimeoutMsec() { + return cleanerTimeoutMsec; + } + + @VisibleForTesting + long getCleanerCheckIntervalMsec() { + return cleanerCheckIntervalMsec; + } + private List createOldWalsCleaner(int size) { LOG.info("Creating OldWALs cleaners with size=" + size); @@ -186,20 +214,20 @@ public class LogCleaner extends CleanerChore { } private static final class CleanerContext { - // At most waits 60 seconds - static final long MAX_WAIT = 60 * 1000; final FileStatus target; volatile boolean result; volatile boolean setFromCleaner = false; + long timeoutMsec; - static CleanerContext createCleanerContext(FileStatus status) { - return status != null ? new CleanerContext(status) : null; + static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { + return status != null ? new CleanerContext(status, timeoutMsec) : null; } - private CleanerContext(FileStatus status) { + private CleanerContext(FileStatus status, long timeoutMsec) { this.target = status; this.result = false; + this.timeoutMsec = timeoutMsec; } synchronized void setResult(boolean res) { @@ -214,8 +242,8 @@ public class LogCleaner extends CleanerChore { while (!setFromCleaner) { wait(waitIfNotFinished); totalTime += waitIfNotFinished; - if (totalTime >= MAX_WAIT) { - LOG.warn("Spend too much time to delete oldwals " + target); + if (totalTime >= timeoutMsec) { + LOG.warn("Spend too much time in {} ms to delete oldwals {}", totalTime, target); return result; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 0263085..c532562 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -280,6 +280,10 @@ public class TestLogsCleaner { public void testOnConfigurationChange() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + LogCleaner.DEFAULT_CLEANER_THREAD_MAX_WAIT_MSEC); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + LogCleaner.DEFAULT_CHECK_INTERVAL_MSEC); // Prepare environments Server server = new DummyServer(); Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), @@ -287,6 +291,8 @@ public class TestLogsCleaner { FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_CLEANER_THREAD_MAX_WAIT_MSEC, cleaner.getCleanerTimeoutMsec()); + assertEquals(LogCleaner.DEFAULT_CHECK_INTERVAL_MSEC, cleaner.getCleanerCheckIntervalMsec()); // Create dir and files for test fs.delete(oldWALsDir, true); fs.mkdirs(oldWALsDir);