diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java new file mode 100644 index 0000000..966639d --- /dev/null +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.master.cleaner; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.hbase.BaseConfigurable; + +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; + +public abstract class BaseFileCleanerDelegate extends BaseConfigurable +implements FileCleanerDelegate { + + @Override + public Iterable getDeletableFiles(Iterable files) { + return Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + return isFileDeletable(file); + }}); + } + + /** + * Should the master delete the file or keep it? + * @param fStat file status of the file to check + * @return true if the file is deletable, false if not + */ + protected abstract boolean isFileDeletable(FileStatus fStat); + +} diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java index b89334d..a260a79 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseHFileCleanerDelegate.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.master.cleaner; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.BaseConfigurable; /** * Base class for the hfile cleaning function inside the master. By default, only the @@ -36,8 +35,7 @@ import org.apache.hadoop.hbase.BaseConfigurable; * provide a default constructor. */ @InterfaceAudience.Private -public abstract class BaseHFileCleanerDelegate extends BaseConfigurable implements - FileCleanerDelegate { +public abstract class BaseHFileCleanerDelegate extends BaseFileCleanerDelegate { private boolean stopped = false; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java index 9da8ad4..847a2a7 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseLogCleanerDelegate.java @@ -19,8 +19,6 @@ package org.apache.hadoop.hbase.master.cleaner; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.BaseConfigurable; /** * Base class for the log cleaning function inside the master. By default, two @@ -38,7 +36,7 @@ import org.apache.hadoop.hbase.BaseConfigurable; * provide a default constructor. */ @InterfaceAudience.Private -public abstract class BaseLogCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate { +public abstract class BaseLogCleanerDelegate extends BaseFileCleanerDelegate { @Override public boolean isFileDeletable(FileStatus fStat) { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 57cd4cb..11e0883 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -32,6 +32,11 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.util.FSUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + /** * Abstract Cleaner that uses a chain of delegates to clean a directory of files * @param Cleaner delegate class that is dynamically loaded from configuration @@ -97,7 +102,7 @@ public abstract class CleanerChore extends Chore * @param conf * @return the new instance */ - public T newFileCleaner(String className, Configuration conf) { + private T newFileCleaner(String className, Configuration conf) { try { Class c = Class.forName(className).asSubclass( FileCleanerDelegate.class); @@ -115,135 +120,157 @@ public abstract class CleanerChore extends Chore @Override protected void chore() { try { - FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir, null); - // if the path (file or directory) doesn't exist, then we can just return - if (files == null) return; - // loop over the found files and see if they should be deleted - for (FileStatus file : files) { - try { - if (file.isDir()) checkAndDeleteDirectory(file.getPath()); - else checkAndDelete(file); - } catch (IOException e) { - e = RemoteExceptionHandler.checkIOException(e); - LOG.warn("Error while cleaning the logs", e); - } - } + FileStatus[] files = FSUtils.listStatus(this.fs, this.oldFileDir); + checkAndDeleteEntries(files); } catch (IOException e) { - LOG.warn("Failed to get status of: " + oldFileDir); + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while cleaning the logs", e); } - } /** + * Loop over the given directory entries, and check whether they can be deleted. + * If an entry is itself a directory it will be recursively checked and deleted itself iff + * all subentries are deleted (and no new subentries are added in the mean time) + * + * @param entries directory entries to check + * @return true if all entries were successfully deleted + * @throws IOException if there is an unexpected filesystem error + */ + private boolean checkAndDeleteEntries(FileStatus[] entries) { + boolean allEntriesDeleted = true; + if (entries != null) { + List files = Lists.newArrayListWithCapacity(entries.length); + for (FileStatus child : entries) { + Path path = child.getPath(); + // attempt to delete all the files under the directory + if (child.isDir()) { + if (!checkAndDeleteDirectory(path)) { + allEntriesDeleted = false; + } + } + // otherwise we can just check the file + else { + files.add(child); + } + } + if (!checkAndDeleteFiles(files)) { + allEntriesDeleted = false; + } + } + return allEntriesDeleted; + } + + /** * Attempt to delete a directory and all files under that directory. Each child file is passed * through the delegates to see if it can be deleted. If the directory has no children when the * cleaners have finished it is deleted. *

* If new children files are added between checks of the directory, the directory will not * be deleted. - * @param toCheck directory to check + * @param dir directory to check * @return true if the directory was deleted, false otherwise. * @throws IOException if there is an unexpected filesystem error */ - public boolean checkAndDeleteDirectory(Path toCheck) throws IOException { + @VisibleForTesting boolean checkAndDeleteDirectory(Path dir) { if (LOG.isTraceEnabled()) { - LOG.trace("Checking directory: " + toCheck); - } - FileStatus[] children = FSUtils.listStatus(fs, toCheck); - // if the directory doesn't exist, then we are done - if (children == null) { - try { - return fs.delete(toCheck, false); - } catch (IOException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + toCheck, e); - } - } - // couldn't delete w/o exception, so we can't return success. - return false; + LOG.trace("Checking directory: " + dir); } - boolean canDeleteThis = true; - for (FileStatus child : children) { - Path path = child.getPath(); - // attempt to delete all the files under the directory - if (child.isDir()) { - if (!checkAndDeleteDirectory(path)) { - canDeleteThis = false; - } - } - // otherwise we can just check the file - else if (!checkAndDelete(child)) { - canDeleteThis = false; - } + try { + FileStatus[] children = FSUtils.listStatus(fs, dir); + boolean allChildrenDeleted = checkAndDeleteEntries(children); + + // if the directory still has children, we can't delete it, so we are done + if (!allChildrenDeleted) return false; + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while listing directory: " + dir, e); + // couldn't list directory, so don't try to delete, and don't return success + return false; } - // if the directory has children, we can't delete it, so we are done - if (!canDeleteThis) return false; - // otherwise, all the children (that we know about) have been deleted, so we should try to // delete this directory. However, don't do so recursively so we don't delete files that have // been added since we last checked. try { - return fs.delete(toCheck, false); + return fs.delete(dir, false); } catch (IOException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Couldn't delete directory: " + toCheck, e); + LOG.trace("Couldn't delete directory: " + dir, e); } + // couldn't delete w/o exception, so we can't return success. + return false; } - - // couldn't delete w/o exception, so we can't return success. - return false; } /** - * Run the given file through each of the cleaners to see if it should be deleted, deleting it if + * Run the given files through each of the cleaners to see if it should be deleted, deleting it if * necessary. - * @param fStat path of the file to check (and possibly delete) - * @throws IOException if cann't delete a file because of a filesystem issue - * @throws IllegalArgumentException if the file is a directory and has children + * @param files List of FileStatus for the files to check (and possibly delete) + * @throws IOException if cannot delete a file because of a filesystem issue + * @return true iff successfully deleted all files */ - private boolean checkAndDelete(FileStatus fStat) throws IOException, IllegalArgumentException { - Path filePath = fStat.getPath(); + private boolean checkAndDeleteFiles(List files) { // first check to see if the path is valid - if (!validate(filePath)) { - LOG.warn("Found a wrongly formatted file: " + filePath.getName() + " deleting it."); - boolean success = this.fs.delete(filePath, true); - if(!success) - LOG.warn("Attempted to delete: " + filePath - + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); - - return success; + List validFiles = Lists.newArrayListWithCapacity(files.size()); + List invalidFiles = Lists.newArrayList(); + for (FileStatus file : files) { + if (validate(file.getPath())) { + validFiles.add(file); + } else { + LOG.warn("Found a wrongly formatted file: " + file.getPath() + " - will delete it."); + invalidFiles.add(file); + } } - // check each of the cleaners for the file + Iterable deletableValidFiles = validFiles; + // check each of the cleaners for the valid files for (T cleaner : cleanersChain) { if (cleaner.isStopped() || this.stopper.isStopped()) { - LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any file in:" + LOG.warn("A file cleaner" + this.getName() + " is stopped, won't delete any more files in:" + this.oldFileDir); return false; } - if (!cleaner.isFileDeletable(fStat)) { - // this file is not deletable, then we are done - if (LOG.isTraceEnabled()) { - LOG.trace(filePath + " is not deletable according to:" + cleaner); + Iterable filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); + + // trace which cleaner is holding on to each file + if (LOG.isTraceEnabled()) { + ImmutableSet filteredFileSet = ImmutableSet.copyOf(filteredFiles); + for (FileStatus file : deletableValidFiles) { + if (!filteredFileSet.contains(file)) { + LOG.trace(file.getPath() + " is not deletable according to:" + cleaner); + } } - return false; } + + deletableValidFiles = filteredFiles; } - // delete this file if it passes all the cleaners - if (LOG.isTraceEnabled()) { - LOG.trace("Removing: " + filePath + " from archive"); - } - boolean success = this.fs.delete(filePath, false); - if (!success) { - LOG.warn("Attempted to delete:" + filePath - + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); + + Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + int deletedFileCount = 0; + for (FileStatus file : filesToDelete) { + Path filePath = file.getPath(); + if (LOG.isTraceEnabled()) { + LOG.trace("Removing: " + filePath + " from archive"); + } + try { + boolean success = this.fs.delete(filePath, false); + if (success) { + deletedFileCount++; + } else { + LOG.warn("Attempted to delete:" + filePath + + ", but couldn't. Run cleaner chain and attempt to delete on next pass."); + } + } catch (IOException e) { + e = RemoteExceptionHandler.checkIOException(e); + LOG.warn("Error while deleting: " + filePath, e); + } } - return success; - } + return deletedFileCount == files.size(); + } @Override public void cleanup() { diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index e46e407..ab70633 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hbase.Stoppable; public interface FileCleanerDelegate extends Configurable, Stoppable { /** - * Should the master delete the file or keep it? - * @param fStat file status of the file to check - * @return true if the file is deletable, false if not + * Determines which of the given files are safe to delete + * @param files files to check for deletion + * @return files that are ok to delete according to this cleaner */ - boolean isFileDeletable(FileStatus fStat); + Iterable getDeletableFiles(Iterable files); } diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index 86669fa..2e3b32f 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -29,15 +29,18 @@ import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate; import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationQueuesClient; -import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import java.io.IOException; -import java.util.HashSet; import java.util.List; import java.util.Set; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; + /** * Implementation of a log cleaner that checks if a log is still scheduled for * replication before deleting it when its TTL is over. @@ -47,48 +50,46 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class); private ZooKeeperWatcher zkw; private ReplicationQueuesClient replicationQueues; - private final Set hlogs = new HashSet(); private boolean stopped = false; private boolean aborted; @Override - public boolean isLogDeletable(FileStatus fStat) { - - // all members of this class are null if replication is disabled, and we - // return true since false would render the LogsCleaner useless + public Iterable getDeletableFiles(Iterable files) { + // all members of this class are null if replication is disabled, + // so we cannot filter the files if (this.getConf() == null) { - return true; + LOG.warn("Not configured - allowing all hlogs to be deleted"); + return files; } - String log = fStat.getPath().getName(); - // If we saw the hlog previously, let's consider it's still used - // At some point in the future we will refresh the list and it will be gone - if (this.hlogs.contains(log)) { - return false; - } - - // Let's see it's still there - // This solution makes every miss very expensive to process since we - // almost completely refresh the cache each time - return !refreshHLogsAndSearch(log); + + final Set hlogs = loadHLogsFromQueues(); + return Iterables.filter(files, new Predicate() { + @Override + public boolean apply(FileStatus file) { + String hlog = file.getPath().getName(); + boolean logInReplicationQueue = hlogs.contains(hlog); + if (LOG.isDebugEnabled()) { + if (logInReplicationQueue) { + LOG.debug("Found log in ZK, keeping: " + hlog); + } else { + LOG.debug("Didn't find this log in ZK, deleting: " + hlog); + } + } + return !logInReplicationQueue; + }}); } /** - * Search through all the hlogs we have in ZK to refresh the cache - * If a log is specified and found, then we early out and return true - * @param searchedLog log we are searching for, pass null to cache everything - * that's in zookeeper. - * @return false until a specified log is found. + * Load all hlogs in all replication queues from ZK */ - private boolean refreshHLogsAndSearch(String searchedLog) { - this.hlogs.clear(); - final boolean lookForLog = searchedLog != null; + private Set loadHLogsFromQueues() { List rss = replicationQueues.getListOfReplicators(); if (rss == null) { - LOG.debug("Didn't find any region server that replicates, deleting: " + - searchedLog); - return false; + LOG.debug("Didn't find any region server that replicates, won't prevent any deletions."); + return ImmutableSet.of(); } + Set hlogs = Sets.newHashSet(); for (String rs: rss) { List listOfPeers = replicationQueues.getAllQueues(rs); // if rs just died, this will be null @@ -98,17 +99,11 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo for (String id : listOfPeers) { List peersHlogs = replicationQueues.getLogsInQueue(rs, id); if (peersHlogs != null) { - this.hlogs.addAll(peersHlogs); - } - // early exit if we found the log - if(lookForLog && this.hlogs.contains(searchedLog)) { - LOG.debug("Found log in ZK, keeping: " + searchedLog); - return true; + hlogs.addAll(peersHlogs); } } } - LOG.debug("Didn't find this log in ZK, deleting: " + searchedLog); - return false; + return hlogs; } @Override @@ -130,10 +125,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate implements Abo } catch (IOException e) { LOG.error("Error while configuring " + this.getClass().getName(), e); } - refreshHLogsAndSearch(null); } - @Override public void stop(String why) { if (this.stopped) return; diff --git hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java index c7e9281..e068a56 100644 --- hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java +++ hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java @@ -1547,7 +1547,7 @@ public abstract class FSUtils { * @param fs file system * @param dir directory * @param filter path filter - * @return null if tabledir doesn't exist, otherwise FileStatus array + * @return null if dir is empty or doesn't exist, otherwise FileStatus array */ public static FileStatus [] listStatus(final FileSystem fs, final Path dir, final PathFilter filter) throws IOException { @@ -1570,7 +1570,7 @@ public abstract class FSUtils { * * @param fs file system * @param dir directory - * @return null if tabledir doesn't exist, otherwise FileStatus array + * @return null if dir is empty or doesn't exist, otherwise FileStatus array */ public static FileStatus[] listStatus(final FileSystem fs, final Path dir) throws IOException { return listStatus(fs, dir, null); diff --git hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 651d499..759516f 100644 --- hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -329,19 +329,20 @@ public class TestZooKeeperTableArchiveClient { BaseHFileCleanerDelegate delegateSpy = Mockito.spy(cleaner); final int[] counter = new int[] { 0 }; final CountDownLatch finished = new CountDownLatch(1); - Mockito.doAnswer(new Answer() { + Mockito.doAnswer(new Answer>() { @Override - public Boolean answer(InvocationOnMock invocation) throws Throwable { + public Iterable answer(InvocationOnMock invocation) throws Throwable { counter[0]++; - LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to isFileDeletable for file: " + LOG.debug(counter[0] + "/ " + expected + ") Wrapping call to getDeletableFiles for files: " + invocation.getArguments()[0]); - Boolean ret = (Boolean) invocation.callRealMethod(); + @SuppressWarnings("unchecked") + Iterable ret = (Iterable) invocation.callRealMethod(); if (counter[0] >= expected) finished.countDown(); return ret; } - }).when(delegateSpy).isFileDeletable(Mockito.any(FileStatus.class)); + }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); cleaners.set(0, delegateSpy); return finished;