diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index defe851..dd4a615 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -66,6 +67,16 @@ public class HFileCleaner extends CleanerChore impleme "hbase.regionserver.hfilecleaner.small.queue.size"; public final static int DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE = 1048576; + public static final String HFILE_DELETE_THREAD_TIMEOUT_MSEC = + "hbase.regionserver.hfilecleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC = 60 * 1000L; + + public static final String HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = + "hbase.regionserver.hfilecleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC = 1000L; + private static final Log LOG = LogFactory.getLog(HFileCleaner.class); BlockingQueue largeFileQueue; @@ -73,6 +84,8 @@ public class HFileCleaner extends CleanerChore impleme private int throttlePoint; private int largeQueueSize; private int smallQueueSize; + private long cleanerThreadTimeoutMsec; + private long cleanerThreadCheckIntervalMsec; private List threads = new ArrayList(); private boolean running; @@ -99,6 +112,11 @@ public class HFileCleaner extends CleanerChore impleme conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); largeFileQueue = new LinkedBlockingQueue(largeQueueSize); smallFileQueue = new LinkedBlockingQueue(smallQueueSize); + cleanerThreadTimeoutMsec = + conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); + cleanerThreadCheckIntervalMsec = + conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); startHFileDeleteThreads(); } @@ -130,7 +148,7 @@ public class HFileCleaner extends CleanerChore impleme } // wait for each submitted task to finish for (HFileDeleteTask task : tasks) { - if (task.getResult()) { + if (task.getResult(cleanerThreadCheckIntervalMsec)) { deletedFiles++; } } @@ -143,7 +161,7 @@ public class HFileCleaner extends CleanerChore impleme * @return HFileDeleteTask to track progress */ private HFileDeleteTask deleteFile(FileStatus file) { - HFileDeleteTask task = new HFileDeleteTask(file); + HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); boolean enqueued = dispatch(task); return enqueued ? task : null; } @@ -273,17 +291,17 @@ public class HFileCleaner extends CleanerChore impleme } static class HFileDeleteTask { - private static final long MAX_WAIT = 60 * 1000L; - private static final long WAIT_UNIT = 1000L; boolean done = false; boolean result; final Path filePath; final long fileLength; + final long timeoutMsec; - public HFileDeleteTask(FileStatus file) { + public HFileDeleteTask(FileStatus file, long timeoutMsec) { this.filePath = file.getPath(); this.fileLength = file.getLen(); + this.timeoutMsec = timeoutMsec; } public synchronized void setResult(boolean result) { @@ -292,17 +310,19 @@ public class HFileCleaner extends CleanerChore impleme notify(); } - public synchronized boolean getResult() { - long waitTime = 0; + public synchronized boolean getResult(long waitIfNotFinished) { + long waitTimeMsec = 0; try { while (!done) { - wait(WAIT_UNIT); - waitTime += WAIT_UNIT; + long startTimeNanos = System.nanoTime(); + wait(waitIfNotFinished); + waitTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, + TimeUnit.NANOSECONDS); if (done) { return this.result; } - if (waitTime > MAX_WAIT) { - LOG.warn("Wait more than " + MAX_WAIT + " ms for deleting " + this.filePath + if (waitTimeMsec > timeoutMsec) { + LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath + ", exit..."); return false; } @@ -346,6 +366,16 @@ public class HFileCleaner extends CleanerChore impleme return throttlePoint; } + @VisibleForTesting + long getCleanerThreadTimeoutMsec() { + return cleanerThreadTimeoutMsec; + } + + @VisibleForTesting + long getCleanerThreadCheckIntervalMsec() { + return cleanerThreadCheckIntervalMsec; + } + @Override public void onConfigurationChange(Configuration conf) { StringBuilder builder = new StringBuilder(); @@ -359,6 +389,12 @@ public class HFileCleaner extends CleanerChore impleme conf.getInt(LARGE_HFILE_DELETE_QUEUE_SIZE, DEFAULT_LARGE_HFILE_DELETE_QUEUE_SIZE); this.smallQueueSize = conf.getInt(SMALL_HFILE_DELETE_QUEUE_SIZE, DEFAULT_SMALL_HFILE_DELETE_QUEUE_SIZE); + this.cleanerThreadTimeoutMsec = + conf.getLong(HFILE_DELETE_THREAD_TIMEOUT_MSEC, DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC); + this.cleanerThreadCheckIntervalMsec = + conf.getLong(HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC); + // record the left over tasks List leftOverTasks = new ArrayList<>(); for (HFileDeleteTask task : largeFileQueue) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index 0c30f95..8b60803 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,11 +47,24 @@ import org.apache.hadoop.hbase.wal.DefaultWALProvider; public class LogCleaner extends CleanerChore { private static final Log LOG = LogFactory.getLog(LogCleaner.class.getName()); - public static final String OLD_WALS_CLEANER_SIZE = "hbase.oldwals.cleaner.thread.size"; - public static final int OLD_WALS_CLEANER_DEFAULT_SIZE = 2; + public static final String OLD_WALS_CLEANER_THREAD_SIZE = "hbase.oldwals.cleaner.thread.size"; + public static final int DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE = 2; + + public static final String OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = + "hbase.oldwals.cleaner.thread.timeout.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC = 60 * 1000L; + + public static final String OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = + "hbase.oldwals.cleaner.thread.check.interval.msec"; + @VisibleForTesting + static final long DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC = 500L; + private final LinkedBlockingQueue pendingDelete; private List oldWALsCleaner; + private long cleanerThreadTimeoutMsec; + private long cleanerThreadCheckIntervalMsec; /** * @param p the period of time to sleep between each run @@ -63,8 +77,12 @@ public class LogCleaner extends CleanerChore { Path oldLogDir) { super("LogsCleaner", p, s, conf, fs, oldLogDir, HBASE_MASTER_LOGCLEANER_PLUGINS); this.pendingDelete = new LinkedBlockingQueue<>(); - int size = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int size = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); this.oldWALsCleaner = createOldWalsCleaner(size); + this.cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + this.cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override @@ -76,7 +94,7 @@ public class LogCleaner extends CleanerChore { public void onConfigurationChange(Configuration conf) { super.onConfigurationChange(conf); - int newSize = conf.getInt(OLD_WALS_CLEANER_SIZE, OLD_WALS_CLEANER_DEFAULT_SIZE); + int newSize = conf.getInt(OLD_WALS_CLEANER_THREAD_SIZE, DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); if (newSize == oldWALsCleaner.size()) { if (LOG.isDebugEnabled()) { LOG.debug("Size from configuration is the same as previous which is " + @@ -86,13 +104,18 @@ public class LogCleaner extends CleanerChore { } interruptOldWALsCleaner(); oldWALsCleaner = createOldWalsCleaner(newSize); + cleanerThreadTimeoutMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + cleanerThreadCheckIntervalMsec = conf.getLong(OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); } @Override protected int deleteFiles(Iterable filesToDelete) { List results = new LinkedList<>(); for (FileStatus toDelete : filesToDelete) { - CleanerContext context = CleanerContext.createCleanerContext(toDelete); + CleanerContext context = CleanerContext.createCleanerContext(toDelete, + cleanerThreadTimeoutMsec); if (context != null) { pendingDelete.add(context); results.add(context); @@ -101,7 +124,7 @@ public class LogCleaner extends CleanerChore { int deletedFiles = 0; for (CleanerContext res : results) { - deletedFiles += res.getResult(500) ? 1 : 0; + deletedFiles += res.getResult(cleanerThreadCheckIntervalMsec) ? 1 : 0; } return deletedFiles; } @@ -117,6 +140,16 @@ public class LogCleaner extends CleanerChore { return oldWALsCleaner.size(); } + @VisibleForTesting + long getCleanerThreadTimeoutMsec() { + return cleanerThreadTimeoutMsec; + } + + @VisibleForTesting + long getCleanerThreadCheckIntervalMsec() { + return cleanerThreadCheckIntervalMsec; + } + private List createOldWalsCleaner(int size) { LOG.info("Creating OldWALs cleaners with size=" + size); @@ -190,20 +223,20 @@ public class LogCleaner extends CleanerChore { } private static final class CleanerContext { - // At most waits 60 seconds - static final long MAX_WAIT = 60 * 1000; final FileStatus target; volatile boolean result; volatile boolean setFromCleaner = false; + long timeoutMsec; - static CleanerContext createCleanerContext(FileStatus status) { - return status != null ? new CleanerContext(status) : null; + static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { + return status != null ? new CleanerContext(status, timeoutMsec) : null; } - private CleanerContext(FileStatus status) { + private CleanerContext(FileStatus status, long timeoutMsec) { this.target = status; this.result = false; + this.timeoutMsec = timeoutMsec; } synchronized void setResult(boolean res) { @@ -213,13 +246,15 @@ public class LogCleaner extends CleanerChore { } synchronized boolean getResult(long waitIfNotFinished) { - long totalTime = 0; + long totalTimeMsec = 0; try { while (!setFromCleaner) { + long startTimeNanos = System.nanoTime(); wait(waitIfNotFinished); - totalTime += waitIfNotFinished; - if (totalTime >= MAX_WAIT) { - LOG.warn("Spend too much time to delete oldwals " + target); + totalTimeMsec += TimeUnit.MILLISECONDS.convert(System.nanoTime() - startTimeNanos, + TimeUnit.NANOSECONDS); + if (totalTimeMsec >= timeoutMsec) { + LOG.warn("Spend too much time " + totalTimeMsec + " ms to delete oldwals " + target); return result; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 5712729..665a4aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -321,6 +321,8 @@ public class TestHFileCleaner { final int UPDATE_QUEUE_SIZE = 1024; final int LARGE_FILE_NUM = 5; final int SMALL_FILE_NUM = 20; + final long THREAD_TIMEOUT_MSEC = 30 * 1000L; + final long THREAD_CHECK_INTERVAL_MSEC = 500L; Configuration conf = UTIL.getConfiguration(); // no cleaner policies = delete all files @@ -338,6 +340,10 @@ public class TestHFileCleaner { Assert.assertEquals(ORIGINAL_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getLargeQueueSize()); Assert.assertEquals(ORIGINAL_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_TIMEOUT_MSEC, + cleaner.getCleanerThreadTimeoutMsec()); + Assert.assertEquals(HFileCleaner.DEFAULT_HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + cleaner.getCleanerThreadCheckIntervalMsec()); // clean up archive directory and create files for testing fs.delete(archivedHfileDir, true); @@ -361,6 +367,9 @@ public class TestHFileCleaner { newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_QUEUE_SIZE, UPDATE_QUEUE_SIZE); + newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_TIMEOUT_MSEC, THREAD_TIMEOUT_MSEC); + newConf.setLong(HFileCleaner.HFILE_DELETE_THREAD_CHECK_INTERVAL_MSEC, + THREAD_CHECK_INTERVAL_MSEC); cleaner.onConfigurationChange(newConf); LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); @@ -369,6 +378,8 @@ public class TestHFileCleaner { Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getLargeQueueSize()); Assert.assertEquals(UPDATE_QUEUE_SIZE, cleaner.getSmallQueueSize()); + Assert.assertEquals(THREAD_TIMEOUT_MSEC, cleaner.getCleanerThreadTimeoutMsec()); + Assert.assertEquals(THREAD_CHECK_INTERVAL_MSEC, cleaner.getCleanerThreadCheckIntervalMsec()); Assert.assertEquals(2, cleaner.getCleanerThreads().size()); // wait until clean done and check diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index db15e95..2578ec9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -242,7 +242,7 @@ public class TestLogsCleaner { new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); - + ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); @@ -261,14 +261,23 @@ public class TestLogsCleaner { @Test public void testOnConfigurationChange() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE); + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC); // Prepare environments Server server = new DummyServer(); Path oldWALsDir = new Path(TEST_UTIL.getDefaultRootDirPath(), HConstants.HREGION_OLDLOGDIR_NAME); FileSystem fs = TEST_UTIL.getDFSCluster().getFileSystem(); final LogCleaner cleaner = new LogCleaner(3000, server, conf, fs, oldWALsDir); - assertEquals(LogCleaner.OLD_WALS_CLEANER_DEFAULT_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_SIZE, cleaner.getSizeOfCleaners()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, + cleaner.getCleanerThreadTimeoutMsec()); + assertEquals(LogCleaner.DEFAULT_OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + cleaner.getCleanerThreadCheckIntervalMsec()); // Create dir and files for test fs.delete(oldWALsDir, true); fs.mkdirs(oldWALsDir); @@ -287,9 +296,16 @@ public class TestLogsCleaner { thread.start(); // change size of cleaners dynamically int sizeToChange = 4; - conf.setInt(LogCleaner.OLD_WALS_CLEANER_SIZE, sizeToChange); + long threadTimeoutToChange = 30 * 1000L; + long threadCheckIntervalToChange = 250L; + conf.setInt(LogCleaner.OLD_WALS_CLEANER_THREAD_SIZE, sizeToChange); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_TIMEOUT_MSEC, threadTimeoutToChange); + conf.setLong(LogCleaner.OLD_WALS_CLEANER_THREAD_CHECK_INTERVAL_MSEC, + threadCheckIntervalToChange); cleaner.onConfigurationChange(conf); assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); + assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); + assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); // Stop chore thread.join(); status = fs.listStatus(oldWALsDir);