files = Lists.newArrayListWithCapacity(entries.length);
+ for (FileStatus child : entries) {
+ Path path = child.getPath();
+ if (child.isDir()) {
+ // for each subdirectory delete it and all entries if possible
+ if (!checkAndDeleteDirectory(path)) {
+ allEntriesDeleted = false;
+ }
+ } else {
+ // collect all files to attempt to delete in one batch
+ files.add(child);
+ }
+ }
+ if (!checkAndDeleteFiles(files)) {
+ allEntriesDeleted = false;
+ }
+ return allEntriesDeleted;
+ }
+
+ /**
* Attempt to delete a directory and all files under that directory. Each child file is passed
- * through the delegates to see if it can be deleted. If the directory has not children when the
+ * through the delegates to see if it can be deleted. If the directory has no children when the
* cleaners have finished it is deleted.
*
* If new children files are added between checks of the directory, the directory will not
* be deleted.
- * @param toCheck directory to check
+ * @param dir directory to check
* @return true if the directory was deleted, false otherwise.
- * @throws IOException if there is an unexpected filesystem error
*/
- public boolean checkAndDeleteDirectory(Path toCheck) throws IOException {
+ @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Checking directory: " + toCheck);
+ LOG.trace("Checking directory: " + dir);
}
- FileStatus[] children = FSUtils.listStatus(fs, toCheck, null);
- // if the directory doesn't exist, then we are done
- if (children == null) {
- try {
- return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck);
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Couldn't delete directory: " + toCheck, e);
- }
- }
- // couldn't delete w/o exception, so we can't return success.
+
+ try {
+ FileStatus[] children = FSUtils.listStatus(fs, dir);
+ boolean allChildrenDeleted = checkAndDeleteEntries(children);
+
+ // if the directory still has children, we can't delete it, so we are done
+ if (!allChildrenDeleted) return false;
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.warn("Error while listing directory: " + dir, e);
+ // couldn't list directory, so don't try to delete, and don't return success
return false;
- }
-
- boolean canDeleteThis = true;
- for (FileStatus child : children) {
- Path path = child.getPath();
- // attempt to delete all the files under the directory
- if (child.isDir()) {
- if (!checkAndDeleteDirectory(path)) {
- canDeleteThis = false;
- }
- }
- // otherwise we can just check the file
- else if (!checkAndDelete(child)) {
- canDeleteThis = false;
- }
}
- // if the directory has children, we can't delete it, so we are done
- if (!canDeleteThis) return false;
-
// otherwise, all the children (that we know about) have been deleted, so we should try to
// delete this directory. However, don't do so recursively so we don't delete files that have
// been added since we last checked.
try {
- return HBaseFileSystem.deleteFileFromFileSystem(fs, toCheck);
+ return HBaseFileSystem.deleteFileFromFileSystem(fs, dir);
} catch (IOException e) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Couldn't delete directory: " + toCheck, e);
+ LOG.trace("Couldn't delete directory: " + dir, e);
}
+ // couldn't delete w/o exception, so we can't return success.
+ return false;
}
-
- // couldn't delete w/o exception, so we can't return success.
- return false;
}
/**
- * Run the given file through each of the cleaners to see if it should be deleted, deleting it if
+ * Run the given files through each of the cleaners to see if it should be deleted, deleting it if
* necessary.
- * @param fStat path of the file to check (and possibly delete)
- * @throws IOException if cann't delete a file because of a filesystem issue
- * @throws IllegalArgumentException if the file is a directory and has children
+ * @param files List of FileStatus for the files to check (and possibly delete)
+ * @return true iff successfully deleted all files
*/
- private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException {
- Path filePath = fStat.getPath();
+ private boolean checkAndDeleteFiles(List files) {
// first check to see if the path is valid
- if (!validate(filePath)) {
- LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it.");
- boolean success = HBaseFileSystem.deleteDirFromFileSystem(fs, filePath);
- if (!success) LOG.warn("Attempted to delete:" + filePath
- + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+ List validFiles = Lists.newArrayListWithCapacity(files.size());
+ List invalidFiles = Lists.newArrayList();
+ for (FileStatus file : files) {
+ if (validate(file.getPath())) {
+ validFiles.add(file);
+ } else {
+ LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it.");
+ invalidFiles.add(file);
+ }
+ }
- return success;
- }
- // check each of the cleaners for the file
+ Iterable deletableValidFiles = validFiles;
+ // check each of the cleaners for the valid files
for (T cleaner : cleanersChain) {
if (cleaner.isStopped() || this.stopper.isStopped()) {
- LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file in:"
+ LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:"
+ this.oldFileDir);
return false;
}
- if (!cleaner.isFileDeletable(fStat)) {
- // this file is not deletable, then we are done
- if (LOG.isTraceEnabled()) {
- LOG.trace(filePath + " is not deletable according to:" + cleaner);
+ Iterable filteredFiles = cleaner.getDeletableFiles(deletableValidFiles);
+
+ // trace which cleaner is holding on to each file
+ if (LOG.isTraceEnabled()) {
+ ImmutableSet filteredFileSet = ImmutableSet.copyOf(filteredFiles);
+ for (FileStatus file : deletableValidFiles) {
+ if (!filteredFileSet.contains(file)) {
+ LOG.trace(file.getPath() + " is not deletable according to:" + cleaner);
+ }
}
- return false;
}
+
+ deletableValidFiles = filteredFiles;
}
- // delete this file if it passes all the cleaners
- if (LOG.isTraceEnabled()) {
- LOG.trace("Removing:" + filePath + " from archive");
+
+ Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles);
+ int deletedFileCount = 0;
+ for (FileStatus file : filesToDelete) {
+ Path filePath = file.getPath();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Removing: " + filePath + " from archive");
+ }
+ try {
+ boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath);
+ if (success) {
+ deletedFileCount++;
+ } else {
+ LOG.warn("Attempted to delete:" + filePath
+ + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
+ }
+ } catch (IOException e) {
+ e = RemoteExceptionHandler.checkIOException(e);
+ LOG.warn("Error while deleting: " + filePath, e);
+ }
}
- boolean success = HBaseFileSystem.deleteFileFromFileSystem(fs, filePath);
- if (!success) {
- LOG.warn("Attempted to delete:" + filePath
- + ", but couldn't. Run cleaner chain and attempt to delete on next pass.");
- }
- return success;
+
+ return deletedFileCount == files.size();
}
@Override
Index: src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
===================================================================
--- src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (revision 1521321)
+++ src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (working copy)
@@ -19,32 +19,36 @@
*/
package org.apache.hadoop.hbase.replication.master;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.BaseConfigurable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.FileCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
/**
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
*/
-public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abortable {
+public class ReplicationLogCleaner extends BaseConfigurable implements FileCleanerDelegate, Abortable {
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private ReplicationZookeeper zkHelper;
- private Set hlogs = new HashSet();
private boolean stopped = false;
private boolean aborted;
@@ -54,51 +58,49 @@
public ReplicationLogCleaner() {}
@Override
- public boolean isLogDeletable(FileStatus fStat) {
-
+ public Iterable getDeletableFiles(Iterable files) {
try {
if (!zkHelper.getReplication()) {
- return false;
+ return ImmutableList.of();
}
} catch (KeeperException e) {
abort("Cannot get the state of replication", e);
- return false;
+ return ImmutableList.of();
}
- // all members of this class are null if replication is disabled, and we
- // return true since false would render the LogsCleaner useless
+ // all members of this class are null if replication is disabled,
+ // so we cannot filter the files
if (this.getConf() == null) {
- return true;
+ return files;
}
- String log = fStat.getPath().getName();
- // If we saw the hlog previously, let's consider it's still used
- // At some point in the future we will refresh the list and it will be gone
- if (this.hlogs.contains(log)) {
- return false;
- }
-
- // Let's see it's still there
- // This solution makes every miss very expensive to process since we
- // almost completely refresh the cache each time
- return !refreshHLogsAndSearch(log);
+
+ final Set hlogs = loadHLogsFromQueues();
+ return Iterables.filter(files, new Predicate() {
+ @Override
+ public boolean apply(FileStatus file) {
+ String hlog = file.getPath().getName();
+ boolean logInReplicationQueue = hlogs.contains(hlog);
+ if (LOG.isDebugEnabled()) {
+ if (logInReplicationQueue) {
+ LOG.debug("Found log in ZK, keeping: " + hlog);
+ } else {
+ LOG.debug("Didn't find this log in ZK, deleting: " + hlog);
+ }
+ }
+ return !logInReplicationQueue;
+ }});
}
/**
- * Search through all the hlogs we have in ZK to refresh the cache
- * If a log is specified and found, then we early out and return true
- * @param searchedLog log we are searching for, pass null to cache everything
- * that's in zookeeper.
- * @return false until a specified log is found.
+ * Load all hlogs in all replication queues from ZK
*/
- private boolean refreshHLogsAndSearch(String searchedLog) {
- this.hlogs.clear();
- final boolean lookForLog = searchedLog != null;
+ private Set loadHLogsFromQueues() {
List rss = zkHelper.getListOfReplicators();
if (rss == null) {
- LOG.debug("Didn't find any region server that replicates, deleting: " +
- searchedLog);
- return false;
+ LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
+ return ImmutableSet.of();
}
+ Set hlogs = Sets.newHashSet();
for (String rs: rss) {
List listOfPeers = zkHelper.getListPeersForRS(rs);
// if rs just died, this will be null
@@ -108,23 +110,18 @@
for (String id : listOfPeers) {
List peersHlogs = zkHelper.getListHLogsForPeerForRS(rs, id);
if (peersHlogs != null) {
- this.hlogs.addAll(peersHlogs);
+ hlogs.addAll(peersHlogs);
}
- // early exit if we found the log
- if(lookForLog && this.hlogs.contains(searchedLog)) {
- LOG.debug("Found log in ZK, keeping: " + searchedLog);
- return true;
- }
}
}
- LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog);
- return false;
+ return hlogs;
}
@Override
public void setConf(Configuration config) {
// If replication is disabled, keep all members null
if (!config.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+ LOG.warn("Not configured - allowing all hlogs to be deleted");
return;
}
// Make my own Configuration. Then I'll have my own connection to zk that
@@ -139,7 +136,6 @@
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
- refreshHLogsAndSearch(null);
}