diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java index b587200..ff42f50 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/BaseFileCleanerDelegate.java @@ -18,12 +18,13 @@ package org.apache.hadoop.hbase.master.cleaner; import java.util.Map; +import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.BaseConfigurable; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; - /** * Base class for file cleaners which allows subclasses to implement a simple * isFileDeletable method (which used to be the FileCleanerDelegate contract). @@ -33,8 +34,8 @@ public abstract class BaseFileCleanerDelegate extends BaseConfigurable implements FileCleanerDelegate { @Override - public Iterable getDeletableFiles(Iterable files) { - return Iterables.filter(files, this::isFileDeletable); + public Set getDeletableFiles(Set files) { + return Sets.filter(files, this::isFileDeletable); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java index 19a7a69..a494fb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/CleanerChore.java @@ -21,16 +21,20 @@ import java.io.IOException; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; import java.util.concurrent.RecursiveTask; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,8 +53,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Predicate; import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; /** * Abstract Cleaner that uses a chain of delegates to clean a directory of files @@ -157,7 +159,28 @@ public abstract class CleanerChore extends Schedu protected final Map params; private final AtomicBoolean enabled = new AtomicBoolean(true); protected List cleanersChain; - + private Set filesToDeleteLastRun = Collections.emptySet(); + + static class PathAndLength { + Path path; + long fileLength; + PathAndLength(Path p, long fileLength) { + this.path = p; + this.fileLength = fileLength; + } + @Override + public boolean equals(Object o1) { + if (!(o1 instanceof PathAndLength)) { + return false; + } + PathAndLength p1 = (PathAndLength) o1; + return this.path.equals(p1.path) && this.fileLength == p1.fileLength; + } + @Override + public int hashCode() { + return Objects.hash(path, fileLength); + } + } public static void initChorePool(Configuration conf) { if (POOL == null) { POOL = new DirScanPool(conf); @@ -368,8 +391,8 @@ public abstract class CleanerChore extends Schedu } // first check to see if the path is valid - List validFiles = Lists.newArrayListWithCapacity(files.size()); - List invalidFiles = Lists.newArrayList(); + HashSet validFiles = Sets.newHashSetWithExpectedSize(files.size()); + HashSet invalidFiles = Sets.newHashSet(); for (FileStatus file : files) { if (validate(file.getPath())) { validFiles.add(file); @@ -379,7 +402,7 @@ public abstract class CleanerChore extends Schedu } } - Iterable deletableValidFiles = validFiles; + Set deletableValidFiles = validFiles; // check each of the cleaners for the valid files for (T cleaner : cleanersChain) { if (cleaner.isStopped() || this.getStopper().isStopped()) { @@ -388,7 +411,7 @@ public abstract class CleanerChore extends Schedu return false; } - Iterable filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); + Set filteredFiles = cleaner.getDeletableFiles(deletableValidFiles); // trace which cleaner is holding on to each file if (LOG.isTraceEnabled()) { @@ -403,22 +426,33 @@ public abstract class CleanerChore extends Schedu deletableValidFiles = filteredFiles; } - Iterable filesToDelete = Iterables.concat(invalidFiles, deletableValidFiles); + Set candidates = Sets.union(invalidFiles, deletableValidFiles); + Set transformedCandidates = transform(candidates); + Set filesToDelete = Sets.intersection(filesToDeleteLastRun, + transformedCandidates); + filesToDeleteLastRun = transformedCandidates; return deleteFiles(filesToDelete) == files.size(); } + // transform Set of FileStatus to Set of Path + private Set transform(Set candidates) { + Set transformed = Sets.newHashSet(); + for (FileStatus file : candidates) { + transformed.add(new PathAndLength(file.getPath(), file.getLen())); + } + return transformed; + } /** * Delete the given files * @param filesToDelete files to delete * @return number of deleted files */ - protected int deleteFiles(Iterable filesToDelete) { + protected int deleteFiles(Iterable filesToDelete) { int deletedFileCount = 0; - for (FileStatus file : filesToDelete) { - Path filePath = file.getPath(); + for (PathAndLength filePath : filesToDelete) { LOG.trace("Removing {} from archive", filePath); try { - boolean success = this.fs.delete(filePath, false); + boolean success = this.fs.delete(filePath.path, false); if (success) { deletedFileCount++; } else { @@ -503,7 +537,7 @@ public abstract class CleanerChore extends Schedu boolean allSubdirsDeleted = true; if (!subDirs.isEmpty()) { - List tasks = Lists.newArrayListWithCapacity(subDirs.size()); + Set tasks = Sets.newHashSetWithExpectedSize(subDirs.size()); sortByConsumedSpace(subDirs); for (FileStatus subdir : subDirs) { CleanerTask task = new CleanerTask(subdir, false); @@ -568,7 +602,7 @@ public abstract class CleanerChore extends Schedu * @return true if all subdirs deleted successfully, false for patial/all failures * @throws IOException something happen during computation */ - private boolean getCleanResult(List tasks) throws IOException { + private boolean getCleanResult(Iterable tasks) throws IOException { boolean cleaned = true; try { for (CleanerTask task : tasks) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java index f2af138..e8ad8bb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/FileCleanerDelegate.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.Stoppable; import java.util.Map; +import java.util.Set; /** * General interface for cleaning files from a folder (generally an archive or @@ -37,7 +38,7 @@ public interface FileCleanerDelegate extends Configurable, Stoppable { * @param files files to check for deletion * @return files that are ok to delete according to this cleaner */ - Iterable getDeletableFiles(Iterable files); + Set getDeletableFiles(Set files); /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 7ad6177..edc00fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -27,7 +27,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Stoppable; @@ -155,11 +154,11 @@ public class HFileCleaner extends CleanerChore { } @Override - public int deleteFiles(Iterable filesToDelete) { + public int deleteFiles(Iterable filesToDelete) { int deletedFiles = 0; List tasks = new ArrayList(); // construct delete tasks and add into relative queue - for (FileStatus file : filesToDelete) { + for (PathAndLength file : filesToDelete) { HFileDeleteTask task = deleteFile(file); if (task != null) { tasks.add(task); @@ -179,14 +178,14 @@ public class HFileCleaner extends CleanerChore { * @param file the file to delete * @return HFileDeleteTask to track progress */ - private HFileDeleteTask deleteFile(FileStatus file) { + private HFileDeleteTask deleteFile(PathAndLength file) { HFileDeleteTask task = new HFileDeleteTask(file, cleanerThreadTimeoutMsec); boolean enqueued = dispatch(task); return enqueued ? task : null; } private boolean dispatch(HFileDeleteTask task) { - if (task.fileLength >= this.throttlePoint) { + if (task.pathAndLength.fileLength >= this.throttlePoint) { if (!this.largeFileQueue.offer(task)) { // should never arrive here as long as we use PriorityQueue LOG.trace("Large file deletion queue is full"); @@ -256,17 +255,18 @@ public class HFileCleaner extends CleanerChore { break; } if (task != null) { - LOG.trace("Removing {}", task.filePath); + LOG.trace("Removing {}", task.pathAndLength.path); boolean succeed; try { - succeed = this.fs.delete(task.filePath, false); + succeed = this.fs.delete(task.pathAndLength.path, false); } catch (IOException e) { - LOG.warn("Failed to delete {}", task.filePath, e); + LOG.warn("Failed to delete {}", task.pathAndLength.path, e); succeed = false; } task.setResult(succeed); if (succeed) { - countDeletedFiles(task.fileLength >= throttlePoint, queue == largeFileQueue); + countDeletedFiles(task.pathAndLength.fileLength >= throttlePoint, + queue == largeFileQueue); } } } @@ -311,7 +311,7 @@ public class HFileCleaner extends CleanerChore { @Override public int compare(HFileDeleteTask o1, HFileDeleteTask o2) { // larger file first so reverse compare - int cmp = Long.compare(o2.fileLength, o1.fileLength); + int cmp = Long.compare(o2.pathAndLength.fileLength, o1.pathAndLength.fileLength); if (cmp != 0) { return cmp; } @@ -324,13 +324,11 @@ public class HFileCleaner extends CleanerChore { boolean done = false; boolean result; - final Path filePath; - final long fileLength; + final PathAndLength pathAndLength; final long timeoutMsec; - public HFileDeleteTask(FileStatus file, long timeoutMsec) { - this.filePath = file.getPath(); - this.fileLength = file.getLen(); + public HFileDeleteTask(PathAndLength file, long timeoutMsec) { + this.pathAndLength = file; this.timeoutMsec = timeoutMsec; } @@ -352,13 +350,13 @@ public class HFileCleaner extends CleanerChore { return this.result; } if (waitTimeMsec > timeoutMsec) { - LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.filePath + LOG.warn("Wait more than " + timeoutMsec + " ms for deleting " + this.pathAndLength.path + ", exit..."); return false; } } } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for result of deleting " + filePath + LOG.warn("Interrupted while waiting for result of deleting " + pathAndLength.path + ", will return false", e); return false; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java index db098e2..a6bd015 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/LogCleaner.java @@ -112,9 +112,9 @@ public class LogCleaner extends CleanerChore { } @Override - protected int deleteFiles(Iterable filesToDelete) { + protected int deleteFiles(Iterable filesToDelete) { List results = new LinkedList<>(); - for (FileStatus toDelete : filesToDelete) { + for (PathAndLength toDelete : filesToDelete) { CleanerContext context = CleanerContext.createCleanerContext(toDelete, cleanerThreadTimeoutMsec); if (context != null) { @@ -180,8 +180,8 @@ public class LogCleaner extends CleanerChore { try { context = pendingDelete.take(); if (context != null) { - FileStatus toClean = context.getTargetToClean(); - succeed = this.fs.delete(toClean.getPath(), false); + Path toClean = context.getTargetToClean(); + succeed = this.fs.delete(toClean, false); } } catch (InterruptedException ite) { // It's most likely from configuration changing request @@ -220,16 +220,16 @@ public class LogCleaner extends CleanerChore { private static final class CleanerContext { - final FileStatus target; + final PathAndLength target; volatile boolean result; volatile boolean setFromCleaner = false; long timeoutMsec; - static CleanerContext createCleanerContext(FileStatus status, long timeoutMsec) { + static CleanerContext createCleanerContext(PathAndLength status, long timeoutMsec) { return status != null ? new CleanerContext(status, timeoutMsec) : null; } - private CleanerContext(FileStatus status, long timeoutMsec) { + private CleanerContext(PathAndLength status, long timeoutMsec) { this.target = status; this.result = false; this.timeoutMsec = timeoutMsec; @@ -261,8 +261,8 @@ public class LogCleaner extends CleanerChore { return result; } - FileStatus getTargetToClean() { - return target; + Path getTargetToClean() { + return target.path; } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java index 358b4ea..1373b96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotFileCache.java @@ -29,6 +29,7 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.locks.ReentrantLock; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -178,10 +179,10 @@ public class SnapshotFileCache implements Stoppable { // XXX this is inefficient to synchronize on the method, when what we really need to guard against // is an illegal access to the cache. Really we could do a mutex-guarded pointer swap on the // cache, but that seems overkill at the moment and isn't necessarily a bottleneck. - public synchronized Iterable getUnreferencedFiles(Iterable files, + public synchronized Set getUnreferencedFiles(Iterable files, final SnapshotManager snapshotManager) throws IOException { - List unReferencedFiles = Lists.newArrayList(); + Set unReferencedFiles = Sets.newHashSet(); List snapshotsInProgress = null; boolean refreshed = false; for (FileStatus file : files) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java index 7c866c2..3424775 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotHFileCleaner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; +import java.util.Set; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -63,7 +64,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate { private MasterServices master; @Override - public synchronized Iterable getDeletableFiles(Iterable files) { + public synchronized Set getDeletableFiles(Set files) { try { return cache.getUnreferencedFiles(files, master.getSnapshotManager()); } catch (CorruptedSnapshotException cse) { @@ -71,7 +72,7 @@ public class SnapshotHFileCleaner extends BaseHFileCleanerDelegate { } catch (IOException e) { LOG.error("Exception while checking if files were valid, keeping them just in case.", e); } - return Collections.emptyList(); + return Collections.emptySet(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java index 7b62169..f267ff6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.replication.master; import java.io.IOException; import java.util.Collections; import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -35,8 +37,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; /** * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before @@ -50,7 +50,7 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { private boolean stopped = false; @Override - public Iterable getDeletableFiles(Iterable files) { + public Set getDeletableFiles(Set files) { // all members of this class are null if replication is disabled, // so we cannot filter the files if (this.getConf() == null) { @@ -64,22 +64,19 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate { hfileRefs = rqs.getAllHFileRefs(); } catch (ReplicationException e) { LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files"); - return Collections.emptyList(); + return Collections.emptySet(); } - return Iterables.filter(files, new Predicate() { - @Override - public boolean apply(FileStatus file) { - String hfile = file.getPath().getName(); - boolean foundHFileRefInQueue = hfileRefs.contains(hfile); - if (LOG.isDebugEnabled()) { - if (foundHFileRefInQueue) { - LOG.debug("Found hfile reference in ZK, keeping: " + hfile); - } else { - LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); - } + return Sets.filter(files, file -> { + String hfile = file.getPath().getName(); + boolean foundHFileRefInQueue = hfileRefs.contains(hfile); + if (LOG.isDebugEnabled()) { + if (foundHFileRefInQueue) { + LOG.debug("Found hfile reference in ZK, keeping: " + hfile); + } else { + LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile); } - return !foundHFileRefInQueue; } + return !foundHFileRefInQueue; }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java index b9ed8dd..dae0dd9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Set; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.hbase.HBaseInterfaceAudience; @@ -34,9 +35,9 @@ import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.base.Predicate; + import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Predicate; -import org.apache.hbase.thirdparty.com.google.common.collect.Iterables; /** * Implementation of a log cleaner that checks if a log is still scheduled for @@ -64,8 +65,16 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } } + private Predicate fileFilter = (file) -> { + String wal = file.getPath().getName(); + boolean logInReplicationQueue = wals.contains(wal); + if (logInReplicationQueue) { + LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); + } + return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); + }; @Override - public Iterable getDeletableFiles(Iterable files) { + public Set getDeletableFiles(Set files) { // all members of this class are null if replication is disabled, // so we cannot filter the files if (this.getConf() == null) { @@ -73,19 +82,9 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate { } if (wals == null) { - return Collections.emptyList(); + return Collections.emptySet(); } - return Iterables.filter(files, new Predicate() { - @Override - public boolean apply(FileStatus file) { - String wal = file.getPath().getName(); - boolean logInReplicationQueue = wals.contains(wal); - if (logInReplicationQueue) { - LOG.debug("Found up in ZooKeeper, NOT deleting={}", wal); - } - return !logInReplicationQueue && (file.getModificationTime() < readZKTimestamp); - } - }); + return Sets.filter(files, fileFilter); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index 16f3930..5bfe897 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -377,7 +377,7 @@ public class TestZooKeeperTableArchiveClient { if (counter[0] >= expected) finished.countDown(); return ret; } - }).when(delegateSpy).getDeletableFiles(Mockito.anyListOf(FileStatus.class)); + }).when(delegateSpy).getDeletableFiles(Mockito.anySetOf(FileStatus.class)); cleaners.set(0, delegateSpy); return finished; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java index 882ea9d..aced4dd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java @@ -27,10 +27,11 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.net.URLEncoder; import java.util.Iterator; -import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -69,8 +70,6 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category({MasterTests.class, MediumTests.class}) public class TestLogsCleaner { @@ -184,6 +183,7 @@ public class TestLogsCleaner { LogCleaner cleaner = new LogCleaner(1000, server, conf, fs, oldLogDir); cleaner.chore(); + cleaner.chore(); // In oldWALs we end up with the current WAL, a newer WAL, the 3 old WALs which // are scheduled for replication and masterProcedureWALs directory @@ -207,7 +207,7 @@ public class TestLogsCleaner { ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( + Set dummyFiles = Sets.newHashSet( new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) ); @@ -250,26 +250,30 @@ public class TestLogsCleaner { * When zk is working both files should be returned * @throws Exception from ZK watcher */ - @Test(timeout=10000) + @Test(timeout=30000) public void testZooKeeperNormal() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); ReplicationLogCleaner cleaner = new ReplicationLogCleaner(); - List dummyFiles = Lists.newArrayList( - new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log1")), - new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path("log2")) + Path log1 = new Path("log1"); + Path log2 = new Path("log2"); + Set dummyFiles = Sets.newHashSet( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), log1), + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), log2) ); ZKWatcher zkw = new ZKWatcher(conf, "testZooKeeperAbort-normal", null); try { cleaner.setConf(conf, zkw); cleaner.preClean(); - Iterable filesToDelete = cleaner.getDeletableFiles(dummyFiles); + Set filesToDelete = cleaner.getDeletableFiles(dummyFiles); Iterator iter = filesToDelete.iterator(); assertTrue(iter.hasNext()); - assertEquals(new Path("log1"), iter.next().getPath()); + Path p = iter.next().getPath(); + assertTrue(p.equals(log1) || p.equals(log2)); assertTrue(iter.hasNext()); - assertEquals(new Path("log2"), iter.next().getPath()); + p = iter.next().getPath(); + assertTrue(p.equals(log1) || p.equals(log2)); assertFalse(iter.hasNext()); } finally { zkw.close(); @@ -319,6 +323,8 @@ public class TestLogsCleaner { assertEquals(sizeToChange, cleaner.getSizeOfCleaners()); assertEquals(threadTimeoutToChange, cleaner.getCleanerThreadTimeoutMsec()); assertEquals(threadCheckIntervalToChange, cleaner.getCleanerThreadCheckIntervalMsec()); + cleaner.chore(); + cleaner.chore(); // Stop chore thread.join(); status = fs.listStatus(oldWALsDir); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java index 24b930c..915dd25 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java @@ -26,8 +26,12 @@ import static org.mockito.Mockito.spy; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Set; + +import org.apache.curator.shaded.com.google.common.collect.Sets; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -70,8 +74,6 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - @Category({ MasterTests.class, SmallTests.class }) public class TestReplicationHFileCleaner { @@ -159,7 +161,7 @@ public class TestReplicationHFileCleaner { fs.createNewFile(deletablefile); assertTrue("Test file not created!", fs.exists(deletablefile)); - List files = new ArrayList<>(2); + Set files = new HashSet<>(2); FileStatus f = new FileStatus(); f.setPath(deletablefile); files.add(f); @@ -194,8 +196,8 @@ public class TestReplicationHFileCleaner { public void testZooKeeperAbort() throws Exception { ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner(); - List dummyFiles = - Lists.newArrayList(new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( + Set dummyFiles = Sets.newHashSet( + new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( "hfile1")), new FileStatus(100, false, 3, 100, System.currentTimeMillis(), new Path( "hfile2")));