diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java index b587200..ff42f50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hbase.master.cleaner; import java.util.Map; +import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.BaseConfigurable; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; - /** * Base class for file cleaners which allows subclasses to implement a simple * isFileDeletable method (which used to be the FileCleanerDelegate contract). @@ -33,8 +34,8 @@ public abstract class BaseFileCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate { @Override - public Iterable getDeletableFiles(Iterable files) { - return Iterables.filter(files, this::isFileDeletable); + public Set getDeletableFiles(Set files) { + return Sets.filter(files, this::isFileDeletable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 19a7a69..218387d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -21,16 +21,19 @@ import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,8 +52,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * Abstract Cleaner that uses a chain of delegates to clean a directory of files @@ -157,6 +158,7 @@ public abstract class CleanerChore extends Schedu protected final Map params; private final AtomicBoolean enabled = new AtomicBoolean(true); protected List cleanersChain; + private Set filesToDeleteLastRun = Collections.emptySet(); public static void initChorePool(Configuration conf) { if (POOL == null) { @@ -368,8 +370,8 @@ public abstract class CleanerChore extends Schedu } // first check to see if the path is valid - List validFiles = Lists.newArrayListWithCapacity(files.size()); - List invalidFiles = Lists.newArrayList(); + HashSet validFiles = Sets.newHashSetWithExpectedSize(files.size()); + HashSet invalidFiles = Sets.newHashSet(); for (FileStatus file : files) { if (validate(file.getPath())) { validFiles.add(file); @@ -379,7 +381,7 @@ public abstract class CleanerChore extends Schedu } } - Iterable deletableValidFiles = validFiles; + Set deletableValidFiles = validFiles; // check each of the cleaners for the valid files for (T cleaner : cleanersChain) { if (cleaner.isStopped() || this.getStopper().isStopped()) { @@ -388,7 +390,7 @@ public abstract class CleanerChore extends Schedu return false; } - Iterable filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); + Set filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); // trace which cleaner is holding on to each file if (LOG.isTraceEnabled()) { @@ -403,7 +405,9 @@ public abstract class CleanerChore extends Schedu deletableValidFiles = filteredFiles; } - Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + Set candidates = Sets.union(invalidFiles, deletableValidFiles); + Set filesToDelete = Sets.intersection(filesToDeleteLastRun, candidates); + filesToDeleteLastRun = candidates; return deleteFiles(filesToDelete) == files.size(); } @@ -503,7 +507,7 @@ public abstract class CleanerChore extends Schedu boolean allSubdirsDeleted = true; if (!subDirs.isEmpty()) { - List tasks = Lists.newArrayListWithCapacity(subDirs.size()); + Set tasks = Sets.newHashSetWithExpectedSize(subDirs.size()); sortByConsumedSpace(subDirs); for (FileStatus subdir : subDirs) { CleanerTask task = new CleanerTask(subdir, false); @@ -568,7 +572,7 @@ public abstract class CleanerChore extends Schedu * @return true if all subdirs deleted successfully, false for patial/all failures * @throws IOException something happen during computation */ - private boolean getCleanResult(List tasks) throws IOException { + private boolean getCleanResult(Iterable tasks) throws IOException { boolean cleaned = true; try { for (CleanerTask task : tasks) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index f2af138..e8ad8bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.Stoppable; import java.util.Map; +import java.util.Set; /** * General interface for cleaning files from a folder (generally an archive or @@ -37,7 +38,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { * @param files files to check for deletion * @return files that are ok to delete according to this cleaner */ - Iterable getDeletableFiles(Iterable files); + Set getDeletableFiles(Set files); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java index 358b4ea..1373b96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java @@ -29,6 +29,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.ReentrantLock; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -178,10 +179,10 @@ public class SnapshotFileCache implements Stoppable { // XXX this is inefficient to synchronize on the method, when what we really need to guard against // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the // cache, but that seems overkill at the moment and isn't necessarily a bottleneck. - public synchronized Iterable getUnreferencedFiles(Iterable files, + public synchronized Set getUnreferencedFiles(Iterable files, final SnapshotManager snapshotManager) throws IOException { - List unReferencedFiles = Lists.newArrayList(); + Set unReferencedFiles = Sets.newHashSet(); List snapshotsInProgress = null; boolean refreshed = false; for (FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java index 7c866c2..3424775 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -63,7 +64,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate { private MasterServices master; @Override - public synchronized Iterable getDeletableFiles(Iterable files) { + public synchronized Set getDeletableFiles(Set files) { try { return cache.getUnreferencedFiles(files, master.getSnapshotManager()); } catch (CorruptedSnapshotException cse) { @@ -71,7 +72,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate { } catch (IOException e) { LOG.error("Exception while checking if files were valid, keeping them just in case.", e); } - return Collections.emptyList(); + return Collections.emptySet(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 7b62169..f267ff6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; import java.util.Collections; import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -35,8 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; /** * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before @@ -50,7 +50,7 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { private boolean stopped = false; @Override - public Iterable getDeletableFiles(Iterable files) { + public Set getDeletableFiles(Set files) { // all members of this class are null if replication is disabled, // so we cannot filter the files if (this.getConf() == null) { @@ -64,22 +64,19 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { hfileRefs = rqs.getAllHFileRefs(); } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); - return Collections.emptyList(); + return Collections.emptySet(); } - return Iterables.filter(files, new Predicate() { - @Override - public boolean apply(FileStatus file) { - String hfile = file.getPath().getName(); - boolean foundHFileRefInQueue = hfileRefs.contains(hfile); - if (LOG.isDebugEnabled()) { - if (foundHFileRefInQueue) { - LOG.debug("Found hfile reference in ZK, keeping: " + hfile); - } else { - LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); - } + return Sets.filter(files, file -> { + String hfile = file.getPath().getName(); + boolean foundHFileRefInQueue = hfileRefs.contains(hfile); + if (LOG.isDebugEnabled()) { + if (foundHFileRefInQueue) { + LOG.debug("Found hfile reference in ZK, keeping: " + hfile); + } else { + LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); } - return !foundHFileRefInQueue; } + return !foundHFileRefInQueue; }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index b9ed8dd..dae0dd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -34,9 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; /** * Implementation of a log cleaner that checks if a log is still scheduled for @@ -64,8 +65,16 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } } + private Predicate fileFilter = (file) -> { + String wal = file.getPath().getName(); + boolean logInReplicationQueue = wals.contains(wal); + if (logInReplicationQueue) { + LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); + } + return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); + }; @Override - public Iterable getDeletableFiles(Iterable files) { + public Set getDeletableFiles(Set files) { // all members of this class are null if replication is disabled, // so we cannot filter the files if (this.getConf() == null) { @@ -73,19 +82,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } if (wals == null) { - return Collections.emptyList(); + return Collections.emptySet(); } - return Iterables.filter(files, new Predicate() { - @Override - public boolean apply(FileStatus file) { - String wal = file.getPath().getName(); - boolean logInReplicationQueue = wals.contains(wal); - if (logInReplicationQueue) { - LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); - } - return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); - } - }); + return Sets.filter(files, fileFilter); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 16f3930..5bfe897 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -377,7 +377,7 @@ public class TestZooKeeperTableArchiveClient { if (counter[0] >= expected) finished.countDown(); return ret; } - }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); + }).when(delegateSpy).getDeletableFiles(Mockito.anySetOf(FileStatus.class)); cleaners.set(0, delegateSpy); return finished; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 882ea9d..1612065 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -27,10 +27,11 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.URLEncoder; import java.util.Iterator; -import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -69,8 +70,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category({MasterTests.class, MediumTests.class}) public class TestLogsCleaner { @@ -184,6 +183,7 @@ public class TestLogsCleaner { LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); cleaner.chore(); + cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // are scheduled for replication and masterProcedureWALs directory @@ -207,7 +207,7 @@ public class TestLogsCleaner { ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( + Set dummyFiles = Sets.newHashSet( new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); @@ -250,26 +250,30 @@ public class TestLogsCleaner { * When zk is working both files should be returned * @throws Exception from ZK watcher */ - @Test(timeout=10000) + @Test(timeout=30000) public void testZooKeeperNormal() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( - new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), - new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + Path log1 = new Path("log1"); + Path log2 = new Path("log2"); + Set dummyFiles = Sets.newHashSet( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), log1), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), log2) ); ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); cleaner.preClean(); - Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); + Set filesToDelete = cleaner.getDeletableFiles(dummyFiles); Iterator iter = filesToDelete.iterator(); assertTrue(iter.hasNext()); - assertEquals(new Path("log1"), iter.next().getPath()); + Path p = iter.next().getPath(); + assertTrue(p.equals(log1) || p.equals(log2)); assertTrue(iter.hasNext()); - assertEquals(new Path("log2"), iter.next().getPath()); + p = iter.next().getPath(); + assertTrue(p.equals(log1) || p.equals(log2)); assertFalse(iter.hasNext()); } finally { zkw.close(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 24b930c..915dd25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -26,8 +26,12 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -70,8 +74,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category({ MasterTests.class, SmallTests.class }) public class TestReplicationHFileCleaner { @@ -159,7 +161,7 @@ public class TestReplicationHFileCleaner { fs.createNewFile(deletablefile); assertTrue("Test file not created!", fs.exists(deletablefile)); - List files = new ArrayList<>(2); + Set files = new HashSet<>(2); FileStatus f = new FileStatus(); f.setPath(deletablefile); files.add(f); @@ -194,8 +196,8 @@ public class TestReplicationHFileCleaner { public void testZooKeeperAbort() throws Exception { ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - List dummyFiles = - Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( + Set dummyFiles = Sets.newHashSet( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( "hfile2")));