diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java index 8b3515a..6abaccb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/HFileCleaner.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -67,6 +68,16 @@ public class HFileCleaner extends CleanerChore impleme "hbase.regionserver.hfilecleaner.small.queue.size"; public final static int DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE = 10240; + // Configuration key for large file delete thread number + public final static String LARGE_HFILE_DELETE_THREAD_NUMBER = + "hbase.regionserver.hfilecleaner.large.threadnumber"; + public final static int DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER = 1; + + // Configuration key for small file delete thread number + public final static String SMALL_HFILE_DELETE_THREAD_NUMBER = + "hbase.regionserver.hfilecleaner.small.threadnumber"; + public final static int DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER = 1; + private static final Log LOG = LogFactory.getLog(HFileCleaner.class); StealJobQueue largeFileQueue; @@ -74,11 +85,13 @@ public class HFileCleaner extends CleanerChore impleme private int throttlePoint; private int largeQueueInitSize; private int smallQueueInitSize; + private int largeFileDeleteThreadNumber; + private int smallFileDeleteThreadNumber; private List threads = new ArrayList(); private boolean running; - private long deletedLargeFiles = 0L; - private long deletedSmallFiles = 0L; + private AtomicLong deletedLargeFiles = new AtomicLong(); + private AtomicLong deletedSmallFiles = new AtomicLong(); /** * @param period the period of time to sleep between each run @@ -100,6 +113,10 @@ public class HFileCleaner extends CleanerChore impleme conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); smallFileQueue = largeFileQueue.getStealFromQueue(); + largeFileDeleteThreadNumber = + conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); + smallFileDeleteThreadNumber = + conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); startHFileDeleteThreads(); } @@ -183,30 +200,34 @@ public class HFileCleaner extends CleanerChore impleme final String n = Thread.currentThread().getName(); running = true; // start thread for large file deletion - Thread large = new Thread() { - @Override - public void run() { - consumerLoop(largeFileQueue); - } - }; - large.setDaemon(true); - large.setName(n + "-HFileCleaner.large-" + System.currentTimeMillis()); - large.start(); - LOG.debug("Starting hfile cleaner for large files: " + large.getName()); - threads.add(large); + for (int i = 0; i < largeFileDeleteThreadNumber; i++) { + Thread large = new Thread() { + @Override + public void run() { + consumerLoop(largeFileQueue); + } + }; + large.setDaemon(true); + large.setName(n + "-HFileCleaner.large." + i + "-" + System.currentTimeMillis()); + large.start(); + LOG.debug("Starting hfile cleaner for large files: " + large.getName()); + threads.add(large); + } // start thread for small file deletion - Thread small = new Thread() { - @Override - public void run() { - consumerLoop(smallFileQueue); - } - }; - small.setDaemon(true); - small.setName(n + "-HFileCleaner.small-" + System.currentTimeMillis()); - small.start(); - LOG.debug("Starting hfile cleaner for small files: " + small.getName()); - threads.add(small); + for (int i = 0; i < smallFileDeleteThreadNumber; i++) { + Thread small = new Thread() { + @Override + public void run() { + consumerLoop(smallFileQueue); + } + }; + small.setDaemon(true); + small.setName(n + "-HFileCleaner.small." + i + "-" + System.currentTimeMillis()); + small.start(); + LOG.debug("Starting hfile cleaner for small files: " + small.getName()); + threads.add(small); + } } protected void consumerLoop(BlockingQueue queue) { @@ -248,20 +269,20 @@ public class HFileCleaner extends CleanerChore impleme // Currently only for testing purpose private void countDeletedFiles(boolean isLargeFile, boolean fromLargeQueue) { if (isLargeFile) { - if (deletedLargeFiles == Long.MAX_VALUE) { + if (deletedLargeFiles.get() == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE large files, reset counter to 0"); - deletedLargeFiles = 0L; + deletedLargeFiles.set(0L); } - deletedLargeFiles++; + deletedLargeFiles.incrementAndGet(); } else { - if (deletedSmallFiles == Long.MAX_VALUE) { + if (deletedSmallFiles.get() == Long.MAX_VALUE) { LOG.info("Deleted more than Long.MAX_VALUE small files, reset counter to 0"); - deletedSmallFiles = 0L; + deletedSmallFiles.set(0L);; } if (fromLargeQueue && LOG.isTraceEnabled()) { LOG.trace("Stolen a small file deletion task in large file thread"); } - deletedSmallFiles++; + deletedSmallFiles.incrementAndGet(); } } @@ -354,12 +375,12 @@ public class HFileCleaner extends CleanerChore impleme @VisibleForTesting public long getNumOfDeletedLargeFiles() { - return deletedLargeFiles; + return deletedLargeFiles.get(); } @VisibleForTesting public long getNumOfDeletedSmallFiles() { - return deletedSmallFiles; + return deletedSmallFiles.get(); } @VisibleForTesting @@ -382,7 +403,9 @@ public class HFileCleaner extends CleanerChore impleme StringBuilder builder = new StringBuilder(); builder.append("Updating configuration for HFileCleaner, previous throttle point: ") .append(throttlePoint).append(", largeQueueInitSize: ").append(largeQueueInitSize) - .append(", smallQueueInitSize: ").append(smallQueueInitSize); + .append(", smallQueueInitSize: ").append(smallQueueInitSize).append(", largeThreadNumber: ") + .append(largeFileDeleteThreadNumber).append(", smallThreadNumber: ") + .append(smallFileDeleteThreadNumber); stopHFileDeleteThreads(); this.throttlePoint = conf.getInt(HFILE_DELETE_THROTTLE_THRESHOLD, DEFAULT_HFILE_DELETE_THROTTLE_THRESHOLD); @@ -391,7 +414,8 @@ public class HFileCleaner extends CleanerChore impleme this.smallQueueInitSize = conf.getInt(SMALL_HFILE_QUEUE_INIT_SIZE, DEFAULT_SMALL_HFILE_QUEUE_INIT_SIZE); // record the left over tasks - List leftOverTasks = new ArrayList<>(); + List leftOverTasks = + new ArrayList<>(largeFileQueue.size() + smallFileQueue.size()); for (HFileDeleteTask task : largeFileQueue) { leftOverTasks.add(task); } @@ -400,9 +424,15 @@ public class HFileCleaner extends CleanerChore impleme } largeFileQueue = new StealJobQueue<>(largeQueueInitSize, smallQueueInitSize); smallFileQueue = largeFileQueue.getStealFromQueue(); + largeFileDeleteThreadNumber = + conf.getInt(LARGE_HFILE_DELETE_THREAD_NUMBER, DEFAULT_LARGE_HFILE_DELETE_THREAD_NUMBER); + smallFileDeleteThreadNumber = + conf.getInt(SMALL_HFILE_DELETE_THREAD_NUMBER, DEFAULT_SMALL_HFILE_DELETE_THREAD_NUMBER); threads.clear(); builder.append("; new throttle point: ").append(throttlePoint).append(", largeQueueInitSize: ") - .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize); + .append(largeQueueInitSize).append(", smallQueueInitSize: ").append(smallQueueInitSize) + .append(", largeThreadNumber: ").append(largeFileDeleteThreadNumber) + .append(", smallThreadNumber: ").append(smallFileDeleteThreadNumber); LOG.debug(builder.toString()); startHFileDeleteThreads(); // re-dispatch the left over tasks diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index 5f05488..72b625f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -327,6 +327,8 @@ public class TestHFileCleaner { final int UPDATE_QUEUE_INIT_SIZE = 1024; final int LARGE_FILE_NUM = 5; final int SMALL_FILE_NUM = 20; + final int LARGE_THREAD_NUM = 2; + final int SMALL_THREAD_NUM = 4; Configuration conf = UTIL.getConfiguration(); // no cleaner policies = delete all files @@ -369,6 +371,8 @@ public class TestHFileCleaner { newConf.setInt(HFileCleaner.HFILE_DELETE_THROTTLE_THRESHOLD, UPDATE_THROTTLE_POINT); newConf.setInt(HFileCleaner.LARGE_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); newConf.setInt(HFileCleaner.SMALL_HFILE_QUEUE_INIT_SIZE, UPDATE_QUEUE_INIT_SIZE); + newConf.setInt(HFileCleaner.LARGE_HFILE_DELETE_THREAD_NUMBER, LARGE_THREAD_NUM); + newConf.setInt(HFileCleaner.SMALL_HFILE_DELETE_THREAD_NUMBER, SMALL_THREAD_NUM); LOG.debug("File deleted from large queue: " + cleaner.getNumOfDeletedLargeFiles() + "; from small queue: " + cleaner.getNumOfDeletedSmallFiles()); cleaner.onConfigurationChange(newConf); @@ -377,7 +381,7 @@ public class TestHFileCleaner { Assert.assertEquals(UPDATE_THROTTLE_POINT, cleaner.getThrottlePoint()); Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getLargeQueueInitSize()); Assert.assertEquals(UPDATE_QUEUE_INIT_SIZE, cleaner.getSmallQueueInitSize()); - Assert.assertEquals(2, cleaner.getCleanerThreads().size()); + Assert.assertEquals(LARGE_THREAD_NUM + SMALL_THREAD_NUM, cleaner.getCleanerThreads().size()); // wait until clean done and check t.join();