Index: src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/HBaseFileSystem.java (working copy) @@ -56,22 +56,22 @@ } /** - * Deletes a file. Assumes the user has already checked for this directory existence. + * Deletes a file. Assumes the user has already checked for this file's existence. * @param fs - * @param dir - * @return true if the directory is deleted. + * @param file + * @return true if the file is deleted. * @throws IOException */ - public static boolean deleteFileFromFileSystem(FileSystem fs, Path dir) + public static boolean deleteFileFromFileSystem(FileSystem fs, Path file) throws IOException { IOException lastIOE = null; int i = 0; do { try { - return fs.delete(dir, false); + return fs.delete(file, false); } catch (IOException ioe) { lastIOE = ioe; - if (!fs.exists(dir)) return true; + if (!fs.exists(file)) return true; // dir is there, retry deleting after some time. sleepBeforeRetry("Delete File", i + 1); } @@ -81,7 +81,7 @@ /** - * Deletes a directory. Assumes the user has already checked for this directory existence. + * Deletes a directory. Assumes the user has already checked for this directory's existence. * @param fs * @param dir * @return true if the directory is deleted. Index: src/main/java/org/apache/hadoop/hbase/util/FSUtils.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/util/FSUtils.java (working copy) @@ -56,7 +56,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; @@ -1262,7 +1261,7 @@ * @param fs file system * @param dir directory * @param filter path filter - * @return null if tabledir doesn't exist, otherwise FileStatus array + * @return null if dir is empty or doesn't exist, otherwise FileStatus array */ public static FileStatus [] listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { @@ -1283,7 +1282,7 @@ * * @param fs file system * @param dir directory - * @return null if tabledir doesn't exist, otherwise FileStatus array + * @return null if dir is empty or doesn't exist, otherwise FileStatus array */ public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { return listStatus(fs, dir, null); Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileLinkCleaner.java (working copy) @@ -18,8 +18,6 @@ package org.apache.hadoop.hbase.master.cleaner; import java.io.IOException; -import java.util.LinkedList; -import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,11 +26,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; - import org.apache.hadoop.hbase.io.HFileLink; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HFileArchiveUtil; -import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; /** * HFileLink cleaner that determines if a hfile should be deleted. Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java (working copy) @@ -19,8 +19,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.BaseConfigurable; /** * Base class for the log cleaning function inside the master. By default, two @@ -38,7 +36,7 @@ * provide a default constructor. */ @InterfaceAudience.Private -public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate { +public abstract class BaseLogCleanerDelegate extends BaseFileCleanerDelegate { @Override public boolean isFileDeletable(FileStatus fStat) { Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java (working copy) @@ -31,9 +31,9 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { /** - * Should the master delete the file or keep it? - * @param fStat file status of the file to check - * @return true if the file is deletable, false if not + * Determines which of the given files are safe to delete + * @param files files to check for deletion + * @return files that are ok to delete according to this cleaner */ - public boolean isFileDeletable(FileStatus fStat); + Iterable getDeletableFiles(Iterable files); } Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java (working copy) @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.cleaner; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.BaseConfigurable; /** * Base class for the hfile cleaning function inside the master. By default, only the @@ -36,8 +35,7 @@ * provide a default constructor. */ @InterfaceAudience.Private -public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements - FileCleanerDelegate { +public abstract class BaseHFileCleanerDelegate extends BaseFileCleanerDelegate { private boolean stopped = false; Index: src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java (working copy) @@ -33,6 +33,11 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + /** * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration @@ -74,7 +79,7 @@ protected abstract boolean validate(Path file); /** - * Instanitate and initialize all the file cleaners set in the configuration + * Instantiate and initialize all the file cleaners set in the configuration * @param confKey key to get the file cleaner classes from the configuration */ private void initCleanerChain(String confKey) { @@ -98,7 +103,7 @@ * @param conf * @return the new instance */ - public T newFileCleaner(String className, Configuration conf) { + private T newFileCleaner(String className, Configuration conf) { try { Class c = Class.forName(className).asSubclass( FileCleanerDelegate.class); @@ -116,131 +121,153 @@ @Override protected void chore() { try { - FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null); - // if the path (file or directory) doesn't exist, then we can just return - if (files == null) return; - // loop over the found files and see if they should be deleted - for (FileStatus file : files) { - try { - if (file.isDir()) checkAndDeleteDirectory(file.getPath()); - else checkAndDelete(file); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Error while cleaning the logs", e); - } - } + FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); + checkAndDeleteEntries(files); } catch (IOException e) { - LOG.warn("Failed to get status of:" + oldFileDir); + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while cleaning the logs", e); } - } /** + * Loop over the given directory entries, and check whether they can be deleted. + * If an entry is itself a directory it will be recursively checked and deleted itself iff + * all subentries are deleted (and no new subentries are added in the mean time) + * + * @param entries directory entries to check + * @return true if all entries were successfully deleted + */ + private boolean checkAndDeleteEntries(FileStatus[] entries) { + if (entries == null) { + return true; + } + boolean allEntriesDeleted = true; + List files = Lists.newArrayListWithCapacity(entries.length); + for (FileStatus child : entries) { + Path path = child.getPath(); + if (child.isDir()) { + // for each subdirectory delete it and all entries if possible + if (!checkAndDeleteDirectory(path)) { + allEntriesDeleted = false; + } + } else { + // collect all files to attempt to delete in one batch + files.add(child); + } + } + if (!checkAndDeleteFiles(files)) { + allEntriesDeleted = false; + } + return allEntriesDeleted; + } + + /** * Attempt to delete a directory and all files under that directory. Each child file is passed - * through the delegates to see if it can be deleted. If the directory has not children when the + * through the delegates to see if it can be deleted. If the directory has no children when the * cleaners have finished it is deleted. *

* If new children files are added between checks of the directory, the directory will not * be deleted. - * @param toCheck directory to check + * @param dir directory to check * @return true if the directory was deleted, false otherwise. - * @throws IOException if there is an unexpected filesystem error */ - public boolean checkAndDeleteDirectory(Path toCheck) throws IOException { + @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) { if (LOG.isTraceEnabled()) { - LOG.trace("Checking directory: " + toCheck); + LOG.trace("Checking directory: " + dir); } - FileStatus[] children = FSUtils.listStatus(fs, toCheck, null); - // if the directory doesn't exist, then we are done - if (children == null) { - try { - return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + toCheck, e); - } - } - // couldn't delete w/o exception, so we can't return success. + + try { + FileStatus[] children = FSUtils.listStatus(fs, dir); + boolean allChildrenDeleted = checkAndDeleteEntries(children); + + // if the directory still has children, we can't delete it, so we are done + if (!allChildrenDeleted) return false; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while listing directory: " + dir, e); + // couldn't list directory, so don't try to delete, and don't return success return false; - } - - boolean canDeleteThis = true; - for (FileStatus child : children) { - Path path = child.getPath(); - // attempt to delete all the files under the directory - if (child.isDir()) { - if (!checkAndDeleteDirectory(path)) { - canDeleteThis = false; - } - } - // otherwise we can just check the file - else if (!checkAndDelete(child)) { - canDeleteThis = false; - } } - // if the directory has children, we can't delete it, so we are done - if (!canDeleteThis) return false; - // otherwise, all the children (that we know about) have been deleted, so we should try to // delete this directory. However, don't do so recursively so we don't delete files that have // been added since we last checked. try { - return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck); + return HBaseFileSystem.deleteFileFromFileSystem(fs, dir); } catch (IOException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + toCheck, e); + LOG.trace("Couldn't delete directory: " + dir, e); } + // couldn't delete w/o exception, so we can't return success. + return false; } - - // couldn't delete w/o exception, so we can't return success. - return false; } /** - * Run the given file through each of the cleaners to see if it should be deleted, deleting it if + * Run the given files through each of the cleaners to see if it should be deleted, deleting it if * necessary. - * @param fStat path of the file to check (and possibly delete) - * @throws IOException if cann't delete a file because of a filesystem issue - * @throws IllegalArgumentException if the file is a directory and has children + * @param files List of FileStatus for the files to check (and possibly delete) + * @return true iff successfully deleted all files */ - private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException { - Path filePath = fStat.getPath(); + private boolean checkAndDeleteFiles(List files) { // first check to see if the path is valid - if (!validate(filePath)) { - LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it."); - boolean success = HBaseFileSystem.deleteDirFromFileSystem(fs, filePath); - if (!success) LOG.warn("Attempted to delete:" + filePath - + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); + List validFiles = Lists.newArrayListWithCapacity(files.size()); + List invalidFiles = Lists.newArrayList(); + for (FileStatus file : files) { + if (validate(file.getPath())) { + validFiles.add(file); + } else { + LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); + invalidFiles.add(file); + } + } - return success; - } - // check each of the cleaners for the file + Iterable deletableValidFiles = validFiles; + // check each of the cleaners for the valid files for (T cleaner : cleanersChain) { if (cleaner.isStopped() || this.stopper.isStopped()) { - LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file in:" + LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" + this.oldFileDir); return false; } - if (!cleaner.isFileDeletable(fStat)) { - // this file is not deletable, then we are done - if (LOG.isTraceEnabled()) { - LOG.trace(filePath + " is not deletable according to:" + cleaner); + Iterable filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); + + // trace which cleaner is holding on to each file + if (LOG.isTraceEnabled()) { + ImmutableSet filteredFileSet = ImmutableSet.copyOf(filteredFiles); + for (FileStatus file : deletableValidFiles) { + if (!filteredFileSet.contains(file)) { + LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); + } } - return false; } + + deletableValidFiles = filteredFiles; } - // delete this file if it passes all the cleaners - if (LOG.isTraceEnabled()) { - LOG.trace("Removing:" + filePath + " from archive"); + + Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + int deletedFileCount = 0; + for (FileStatus file : filesToDelete) { + Path filePath = file.getPath(); + if (LOG.isTraceEnabled()) { + LOG.trace("Removing: " + filePath + " from archive"); + } + try { + boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath); + if (success) { + deletedFileCount++; + } else { + LOG.warn("Attempted to delete:" + filePath + + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); + } + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while deleting: " + filePath, e); + } } - boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath); - if (!success) { - LOG.warn("Attempted to delete:" + filePath - + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); - } - return success; + + return deletedFileCount == files.size(); } @Override Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1521321) +++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy) @@ -19,32 +19,36 @@ */ package org.apache.hadoop.hbase.replication.master; +import java.io.IOException; +import java.util.List; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.BaseConfigurable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HConnectionManager; -import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; +import org.apache.hadoop.hbase.master.cleaner.FileCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationZookeeper; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; /** * Implementation of a log cleaner that checks if a log is still scheduled for * replication before deleting it when its TTL is over. */ -public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable { +public class ReplicationLogCleaner extends BaseConfigurable implements FileCleanerDelegate, Abortable { private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private ReplicationZookeeper zkHelper; - private Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @@ -54,51 +58,49 @@ public ReplicationLogCleaner() {} @Override - public boolean isLogDeletable(FileStatus fStat) { - + public Iterable getDeletableFiles(Iterable files) { try { if (!zkHelper.getReplication()) { - return false; + return ImmutableList.of(); } } catch (KeeperException e) { abort("Cannot get the state of replication", e); - return false; + return ImmutableList.of(); } - // all members of this class are null if replication is disabled, and we - // return true since false would render the LogsCleaner useless + // all members of this class are null if replication is disabled, + // so we cannot filter the files if (this.getConf() == null) { - return true; + return files; } - String log = fStat.getPath().getName(); - // If we saw the hlog previously, let's consider it's still used - // At some point in the future we will refresh the list and it will be gone - if (this.hlogs.contains(log)) { - return false; - } - - // Let's see it's still there - // This solution makes every miss very expensive to process since we - // almost completely refresh the cache each time - return !refreshHLogsAndSearch(log); + + final Set hlogs = loadHLogsFromQueues(); + return Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + String hlog = file.getPath().getName(); + boolean logInReplicationQueue = hlogs.contains(hlog); + if (LOG.isDebugEnabled()) { + if (logInReplicationQueue) { + LOG.debug("Found log in ZK, keeping: " + hlog); + } else { + LOG.debug("Didn't find this log in ZK, deleting: " + hlog); + } + } + return !logInReplicationQueue; + }}); } /** - * Search through all the hlogs we have in ZK to refresh the cache - * If a log is specified and found, then we early out and return true - * @param searchedLog log we are searching for, pass null to cache everything - * that's in zookeeper. - * @return false until a specified log is found. + * Load all hlogs in all replication queues from ZK */ - private boolean refreshHLogsAndSearch(String searchedLog) { - this.hlogs.clear(); - final boolean lookForLog = searchedLog != null; + private Set loadHLogsFromQueues() { List rss = zkHelper.getListOfReplicators(); if (rss == null) { - LOG.debug("Didn't find any region server that replicates, deleting: " + - searchedLog); - return false; + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); } + Set hlogs = Sets.newHashSet(); for (String rs: rss) { List listOfPeers = zkHelper.getListPeersForRS(rs); // if rs just died, this will be null @@ -108,23 +110,18 @@ for (String id : listOfPeers) { List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id); if (peersHlogs != null) { - this.hlogs.addAll(peersHlogs); + hlogs.addAll(peersHlogs); } - // early exit if we found the log - if(lookForLog && this.hlogs.contains(searchedLog)) { - LOG.debug("Found log in ZK, keeping: " + searchedLog); - return true; - } } } - LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog); - return false; + return hlogs; } @Override public void setConf(Configuration config) { // If replication is disabled, keep all members null if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) { + LOG.warn("Not configured - allowing all hlogs to be deleted"); return; } // Make my own Configuration. Then I'll have my own connection to zk that @@ -139,7 +136,6 @@ } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } - refreshHLogsAndSearch(null); }