Index: hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java =================================================================== --- hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (revision 1393233) +++ hbase-server/src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (working copy) @@ -22,11 +22,20 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; @@ -43,6 +52,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.zookeeper.KeeperException; +import org.apache.hadoop.hbase.util.Threads; /** * Handler to create a table. @@ -122,7 +132,7 @@ if (cpHost != null) { cpHost.preCreateTableHandler(this.hTableDescriptor, this.newRegions); } - handleCreateTable(); + handleCreateTable(tableName); if (cpHost != null) { cpHost.postCreateTableHandler(this.hTableDescriptor, this.newRegions); } @@ -133,34 +143,47 @@ } } - private void handleCreateTable() throws IOException, KeeperException { - + private void handleCreateTable(String tableName) throws IOException, + KeeperException { + int regionNumber = newRegions.length; + ThreadPoolExecutor regionOpenAndInitializedThreadPool = getRegionOpenAndInitializedThreadPool( + "RegionOpenAndInitlizeThread-" + tableName, regionNumber); + CompletionService completionService = new ExecutorCompletionService( + regionOpenAndInitializedThreadPool); // TODO: Currently we make the table descriptor and as side-effect the // tableDir is created. Should we change below method to be createTable // where we create table in tmp dir with its table descriptor file and then // do rename to move it into place? FSTableDescriptors.createTableDescriptor(this.hTableDescriptor, this.conf); - List regionInfos = new ArrayList(); - final int batchSize = - this.conf.getInt("hbase.master.createtable.batchsize", 100); - for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) { - HRegionInfo newRegion = this.newRegions[regionIdx]; - // 1. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - this.fileSystemManager.getRootDir(), this.conf, - this.hTableDescriptor, null, false, true); + for (final HRegionInfo newRegion : newRegions) { + completionService.submit(new Callable() { + public HRegion call() throws IOException { - regionInfos.add(region.getRegionInfo()); - if (regionIdx % batchSize == 0) { - // 2. Insert into META - MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); - regionInfos.clear(); - } + // 2. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + fileSystemManager.getRootDir(), conf, hTableDescriptor, null, + false, true); - // 3. Close the new region to flush to disk. Close log file too. - region.close(); + // 3. Close the new region to flush to disk. Close log file too. + region.close(); + return region; + } + }); } + try { + for (int i = 0; i < regionNumber; i++) { + Future future = completionService.take(); + HRegion region = future.get(); + regionInfos.add(region.getRegionInfo()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + regionOpenAndInitializedThreadPool.shutdownNow(); + } if (regionInfos.size() > 0) { MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); } @@ -184,4 +207,27 @@ " enabled because of a ZooKeeper issue", e); } } + + protected ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + final String threadNamePrefix, int regionNumber) { + int maxThreads = Math.min(regionNumber, conf.getInt( + HConstants.HBASE_HREGION_OPEN_AND_INIT_THREADS_MAX, + HConstants.DEFAULT_HBASE_HREGION_OPEN_AND_INIT_THREADS_MAX)); + return getRegionOpenAndInitializedThreadPool(maxThreads, threadNamePrefix); + } + + private ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + int maxThreads, final String threadNamePrefix) { + ThreadPoolExecutor openAndInitializeThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + return t; + } + }); + return openAndInitializeThreadPool; + } } Index: hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1393233) +++ hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -261,7 +261,17 @@ * closing stores or store files in parallel */ public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1; + /** + * The max number of threads used for opening and initialize regions in + * parallel + */ + public static final String HBASE_HREGION_OPEN_AND_INIT_THREADS_MAX = "hbase.hregion.open.and.init.threads.max"; + /** + * The default number for the max number of threads used for opening and + * initializing regions in parallel + */ + public static final int DEFAULT_HBASE_HREGION_OPEN_AND_INIT_THREADS_MAX = 10; /** Conf key for the memstore size at which we flush the memstore */ public static final String HREGION_MEMSTORE_FLUSH_SIZE =