diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 14dded1..866110c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1087,6 +1087,13 @@ public final class HConstants { "hbase.heap.occupancy.high_water_mark"; public static final float DEFAULT_HEAP_OCCUPANCY_HIGH_WATERMARK = 0.98f; + /** + * The max number of threads used for splitting storefiles in parallel during + * the region split process. + */ + public static final String REGION_SPLIT_THREADS_MAX = + "hbase.regionserver.region.split.threads.max"; + private HConstants() { // Can't be instantiated with this ctor. } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index 4bfc092..9be7385 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -819,17 +819,29 @@ public class SplitTransaction { // The following code sets up a thread pool executor with as many slots as // there's files to split. It then fires up everything, waits for // completion and finally checks for any exception - int nbFiles = hstoreFilesToSplit.size(); + int nbFiles = 0; + for (Map.Entry> entry: hstoreFilesToSplit.entrySet()) { + nbFiles += entry.getValue().size(); + } if (nbFiles == 0) { // no file needs to be splitted. return new Pair(0,0); } - LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent); + // Default max #threads to use is the smaller of table's configured number of blocking store + // files or the available number of logical cores. + int defMaxThreads = Math.min(parent.conf.getInt(HStore.BLOCKING_STOREFILES_KEY, + HStore.DEFAULT_BLOCKING_STOREFILE_COUNT), + Runtime.getRuntime().availableProcessors()); + // Max #threads is the smaller of the number of storefiles or the default max determined above. + int maxThreads = Math.min(parent.conf.getInt(HConstants.REGION_SPLIT_THREADS_MAX, + defMaxThreads), nbFiles); + LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent + + " using " + maxThreads + " threads"); ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); builder.setNameFormat("StoreFileSplitter-%1$d"); ThreadFactory factory = builder.build(); ThreadPoolExecutor threadPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); + (ThreadPoolExecutor) Executors.newFixedThreadPool(maxThreads, factory); List>> futures = new ArrayList>> (nbFiles); // Split each store file. @@ -881,7 +893,12 @@ public class SplitTransaction { return new Pair(created_a, created_b); } - private Pair splitStoreFile(final byte[] family, final StoreFile sf) throws IOException { + private Pair splitStoreFile(final byte[] family, final StoreFile sf) + throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting started for store file: " + sf.getPath() + " for region: " + + this.parent); + } HRegionFileSystem fs = this.parent.getRegionFileSystem(); String familyName = Bytes.toString(family); Path path_a = @@ -890,6 +907,10 @@ public class SplitTransaction { Path path_b = fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, this.parent.getSplitPolicy()); + if (LOG.isDebugEnabled()) { + LOG.debug("Splitting complete for store file: " + sf.getPath() + " for region: " + + this.parent); + } return new Pair(path_a, path_b); }