From e92ec28f1a425b539bfbf56169a6c67a476e1b74 Mon Sep 17 00:00:00 2001 From: Reid Chan Date: Mon, 6 Nov 2017 19:16:15 +0800 Subject: [PATCH] HBASE-18309 Support multi threads to clean oldWALs --- .../hadoop/hbase/master/cleaner/CleanerChore.java | 321 ++++++++++++--------- .../hbase/master/cleaner/TestCleanerChore.java | 7 +- 2 files changed, 188 insertions(+), 140 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index b8ca1ecfc2..92de583d51 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -17,18 +17,20 @@ */ package org.apache.hadoop.hbase.master.cleaner; -import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.com.google.common.base.Predicate; import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import java.io.IOException; -import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -49,6 +51,12 @@ import org.apache.hadoop.ipc.RemoteException; public abstract class CleanerChore extends ScheduledChore { private static final Log LOG = LogFactory.getLog(CleanerChore.class.getName()); + private static final String CLEANER_POOL_SIZE = "hbase.logcleaner.threadpool.size"; + private static final String DEFAULT_CLEANER_POOL_SIZE = "0.5"; + + // It may be waste resources for each cleaner chore own its pool, + // so let's make pool for all cleaner chores. + private static ForkJoinPool cleanerPool; protected final FileSystem fs; private final Path oldFileDir; @@ -80,8 +88,43 @@ public abstract class CleanerChore extends Schedu this.conf = conf; this.params = params; initCleanerChain(confKey); + + if (cleanerPool == null) { + String poolSize = conf.get(CLEANER_POOL_SIZE, DEFAULT_CLEANER_POOL_SIZE); + int maxPoolSize = calculatePoolSize(poolSize); + // poolSize may be 0 or 0.0 from a careless configuration, + // double check to make sure. + maxPoolSize = maxPoolSize == 0 ? calculatePoolSize(DEFAULT_CLEANER_POOL_SIZE) : maxPoolSize; + this.cleanerPool = new ForkJoinPool(maxPoolSize); + } } + /** + * Calculate size for cleaner pool. + * If poolSize > 1, it would be the size of pool; + * if 0 < poolSize < 1, size of pool would be available processors * poolSize. + * @param poolSize size from configuration + * @return size of pool after calculation + */ + private int calculatePoolSize(String poolSize) { + try { + // if poolSize is an integer, return it directly. + if (poolSize.matches("[0-9]+")) { + return Integer.valueOf(poolSize); + } else if (poolSize.matches("0.[0-9]+")) { + // if poolSize is a double, return poolSize * availableProcessors; + int coreSize = Runtime.getRuntime().availableProcessors(); + return (int) (coreSize * Double.valueOf(poolSize)); + } else { + throw new Exception(poolSize + " is neither an integer nor a double."); + } + } catch (Exception e) { + // in case of any exception, e.g. number format exception. + LOG.error("Unrecognized value: " + poolSize + " for " + CLEANER_POOL_SIZE + + ", use default config: " + DEFAULT_CLEANER_POOL_SIZE + " instead."); + } + return calculatePoolSize(DEFAULT_CLEANER_POOL_SIZE); + } /** * Validate the file to see if it even belongs in the directory. If it is valid, then the file @@ -135,7 +178,13 @@ public abstract class CleanerChore extends Schedu @Override protected void chore() { if (getEnabled()) { - runCleaner(); + if (runCleaner()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Cleaned old files/dirs under " + oldFileDir + " successfully."); + } + } else { + LOG.warn("Failed to fully clean old files/dirs under " + oldFileDir + "."); + } } else { LOG.debug("Cleaner chore disabled! Not cleaning."); } @@ -147,138 +196,9 @@ public abstract class CleanerChore extends Schedu public Boolean runCleaner() { preRunCleaner(); - try { - FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); - checkAndDeleteEntries(files); - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.warn("Error while cleaning the logs", e); - return false; - } - return true; - } - - /** - * Sort the given list in (descending) order of the space each element takes - * @param dirs the list to sort, element in it should be directory (not file) - */ - private void sortByConsumedSpace(List dirs) { - if (dirs == null || dirs.size() < 2) { - // no need to sort for empty or single directory - return; - } - dirs.sort(new Comparator() { - HashMap directorySpaces = new HashMap<>(); - - @Override - public int compare(FileStatus f1, FileStatus f2) { - long f1ConsumedSpace = getSpace(f1); - long f2ConsumedSpace = getSpace(f2); - return Long.compare(f2ConsumedSpace, f1ConsumedSpace); - } - - private long getSpace(FileStatus f) { - Long cached = directorySpaces.get(f); - if (cached != null) { - return cached; - } - try { - long space = - f.isDirectory() ? fs.getContentSummary(f.getPath()).getSpaceConsumed() : f.getLen(); - directorySpaces.put(f, space); - return space; - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("failed to get space consumed by path " + f.getPath(), e); - } - return -1; - } - } - }); - } - - /** - * Loop over the given directory entries, and check whether they can be deleted. - * If an entry is itself a directory it will be recursively checked and deleted itself iff - * all subentries are deleted (and no new subentries are added in the mean time) - * - * @param entries directory entries to check - * @return true if all entries were successfully deleted - */ - private boolean checkAndDeleteEntries(FileStatus[] entries) { - if (entries == null) { - return true; - } - boolean allEntriesDeleted = true; - List files = Lists.newArrayListWithCapacity(entries.length); - List dirs = new ArrayList<>(); - for (FileStatus child : entries) { - if (child.isDirectory()) { - dirs.add(child); - } else { - // collect all files to attempt to delete in one batch - files.add(child); - } - } - if (dirs.size() > 0) { - sortByConsumedSpace(dirs); - LOG.debug("Prepared to delete files in directories: " + dirs); - for (FileStatus child : dirs) { - Path path = child.getPath(); - // for each subdirectory delete it and all entries if possible - if (!checkAndDeleteDirectory(path)) { - allEntriesDeleted = false; - } - } - } - if (!checkAndDeleteFiles(files)) { - allEntriesDeleted = false; - } - return allEntriesDeleted; - } - - /** - * Attempt to delete a directory and all files under that directory. Each child file is passed - * through the delegates to see if it can be deleted. If the directory has no children when the - * cleaners have finished it is deleted. - *

- * If new children files are added between checks of the directory, the directory will not - * be deleted. - * @param dir directory to check - * @return true if the directory was deleted, false otherwise. - */ - @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) { - if (LOG.isTraceEnabled()) { - LOG.trace("Checking directory: " + dir); - } - - try { - FileStatus[] children = FSUtils.listStatus(fs, dir); - boolean allChildrenDeleted = checkAndDeleteEntries(children); - - // if the directory still has children, we can't delete it, so we are done - if (!allChildrenDeleted) return false; - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.warn("Error while listing directory: " + dir, e); - // couldn't list directory, so don't try to delete, and don't return success - return false; - } - - // otherwise, all the children (that we know about) have been deleted, so we should try to - // delete this directory. However, don't do so recursively so we don't delete files that have - // been added since we last checked. - try { - return fs.delete(dir, false); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + dir, e); - } - // couldn't delete w/o exception, so we can't return success. - return false; - } + CleanerTask task = new CleanerTask(this.oldFileDir, true); + cleanerPool.submit(task); + return task.join(); } /** @@ -288,6 +208,10 @@ public abstract class CleanerChore extends Schedu * @return true iff successfully deleted all files */ private boolean checkAndDeleteFiles(List files) { + if (files == null) { + return true; + } + // first check to see if the path is valid List validFiles = Lists.newArrayListWithCapacity(files.size()); List invalidFiles = Lists.newArrayList(); @@ -378,4 +302,131 @@ public abstract class CleanerChore extends Schedu public boolean getEnabled() { return this.enabled.get(); } + + private interface Action { + T act() throws IOException; + } + + private class CleanerTask extends RecursiveTask { + private final Path dir; + private final boolean root; + + CleanerTask(final FileStatus dir, final boolean root) { + this(dir.getPath(), root); + } + + CleanerTask(final Path dir, final boolean root) { + this.dir = dir; + this.root = root; + } + + @Override + protected Boolean compute() { + if (LOG.isDebugEnabled()) { + LOG.debug("CleanerTask " + Thread.currentThread().getId() + + " starts cleaning subdirs, files under " + dir.getName() + " and itself."); + } + + List subDirs; + List files; + try { + subDirs = getFilteredStatus(status -> status.isDirectory()); + files = getFilteredStatus(status -> status.isFile()); + } catch (IOException ioe) { + LOG.warn(dir.getName() + " doesn't exist, just skip it. ", ioe); + return true; + } + + boolean nullSubDirs = subDirs == null; + if (nullSubDirs) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no subdir under " + dir.getName()); + } + } + if (files == null) { + if (LOG.isDebugEnabled()) { + LOG.debug("There is no file under " + dir.getName()); + } + } + + int capacity = nullSubDirs ? 0 : subDirs.size(); + List tasks = Lists.newArrayListWithCapacity(capacity); + if (!nullSubDirs) { + for (FileStatus subdir : subDirs) { + CleanerTask task = new CleanerTask(subdir, false); + tasks.add(task); + task.fork(); + } + } + + boolean result = true; + result &= deleteAction(() -> checkAndDeleteFiles(files), "files"); + result &= deleteAction(() -> getCleanRusult(tasks), "subdirs"); + // if and only if files and subdirs under current dir are deleted successfully, and + // it is not the root dir, then task will try to delete it. + if (result && !root) { + result &= deleteAction(() -> fs.delete(dir, false), "dir"); + } + return result; + } + + /** + * Get FileStatus with filter. + * Pay attention that FSUtils #listStatusWithStatusFilter would return null, + * even though status is empty but not null. + * @param function a filter function + * @return filtered FileStatus + * @throws IOException if there's no such a directory + */ + private List getFilteredStatus(Predicate function) throws IOException { + return FSUtils.listStatusWithStatusFilter(fs, dir, status -> function.test(status)); + } + + /** + * Perform a delete on a specified type. + * @param delete a delete + * @param type possible values are 'files', 'subdirs', 'dirs' + * @return true if it deleted successfully, false otherwise + */ + private boolean deleteAction(Action delete, String type) { + boolean deleted; + String errorMsg = ""; + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Start checking deletion of " + type + " under " + dir.getName()); + } + deleted = delete.act(); + } catch (IOException ioe) { + errorMsg = ioe.getMessage(); + deleted = false; + } + if (LOG.isDebugEnabled()) { + if (deleted) { + LOG.debug("Finish deleting " + type + " under " + dir.getName()); + } else { + LOG.debug("Couldn't delete " + type + " completely under " + dir.getName() + + " with reasons: " + errorMsg); + } + } + return deleted; + } + + /** + * Get cleaner results of subdirs. + * @param tasks subdirs cleaner tasks + * @return true if all subdirs deleted successfully, false for patial/all failures + * @throws IOException + */ + private boolean getCleanRusult(List tasks) throws IOException { + boolean cleaned = true; + try { + for (CleanerTask task : tasks) { + cleaned &= task.get(); + } + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); + } + return cleaned; + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java index 566479a6de..9029d4f52f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestCleanerChore.java @@ -276,11 +276,8 @@ public class TestCleanerChore { } }).when(spy).isFileDeletable(Mockito.any()); - // attempt to delete the directory, which - if (chore.checkAndDeleteDirectory(parent)) { - throw new Exception( - "Reported success deleting directory, should have failed when adding file mid-iteration"); - } + // run the chore + chore.chore(); // make sure all the directories + added file exist, but the original file is deleted assertTrue("Added file unexpectedly deleted", fs.exists(racyFile)); -- 2.13.5 (Apple Git-94)