### Eclipse Workspace Patch 1.0 #P apache94branch Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -249,7 +249,19 @@ */ public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1; + /** + * The max number of threads used for opening and initialize regions in + * parallel + */ + public static final String HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = "hbase.hregion.open.and.initialize.threads.max"; + /** + * The default number for the max number of threads used for opening and + * initializing regions in parallel + */ + public static final int DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = 10; + + /** Conf key for the memstore size at which we flush the memstore */ public static final String HREGION_MEMSTORE_FLUSH_SIZE = "hbase.hregion.memstore.flush.size"; Index: src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (working copy) @@ -23,10 +23,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; @@ -41,8 +50,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; /** @@ -123,7 +132,7 @@ String tableName = this.hTableDescriptor.getNameAsString(); try { LOG.info("Attemping to create the table " + tableName); - handleCreateTable(); + handleCreateTable(tableName); } catch (IOException e) { LOG.error("Error trying to create the table " + tableName, e); } catch (KeeperException e) { @@ -131,44 +140,51 @@ } } - private void handleCreateTable() throws IOException, KeeperException { - + private void handleCreateTable(String tableName) throws IOException, + KeeperException { + int regionNumber = newRegions.length; + ThreadPoolExecutor regionOpenAndInitializedThreadPool = getRegionOpenAndInitializedThreadPool( + "RegionOpenAndInitlizeThread-" + tableName, regionNumber); + CompletionService completionService = new ExecutorCompletionService( + regionOpenAndInitializedThreadPool); // TODO: Currently we make the table descriptor and as side-effect the // tableDir is created. Should we change below method to be createTable // where we create table in tmp dir with its table descriptor file and then // do rename to move it into place? FSTableDescriptors.createTableDescriptor(this.hTableDescriptor, this.conf); - List regionInfos = new ArrayList(); - final int batchSize = - this.conf.getInt("hbase.master.createtable.batchsize", 100); - HLog hlog = null; - for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) { - HRegionInfo newRegion = this.newRegions[regionIdx]; - // 1. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - this.fileSystemManager.getRootDir(), this.conf, - this.hTableDescriptor, hlog); - if (hlog == null) { - hlog = region.getLog(); - } + for (final HRegionInfo newRegion : newRegions) { + completionService.submit(new Callable() { + public HRegion call() throws IOException { - regionInfos.add(region.getRegionInfo()); - if (regionIdx % batchSize == 0) { - // 2. Insert into META - MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); - regionInfos.clear(); - } + // 1. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + fileSystemManager.getRootDir(), conf, hTableDescriptor, null); - // 3. Close the new region to flush to disk. Close log file too. - region.close(); + // 2. Close the new region to flush to disk. Close log file too. + region.close(); + return region; + } + }); } - hlog.closeAndDelete(); + try { + for (int i = 0; i < regionNumber; i++) { + Future future = completionService.take(); + HRegion region = future.get(); + regionInfos.add(region.getRegionInfo()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + regionOpenAndInitializedThreadPool.shutdownNow(); + } if (regionInfos.size() > 0) { MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); } - // 4. Trigger immediate assignment of the regions in round-robin fashion + // 3. Trigger immediate assignment of the regions in round-robin fashion List servers = serverManager.getOnlineServersList(); // Remove the deadNotExpired servers from the server list. assignmentManager.removeDeadNotExpiredServers(servers); @@ -180,7 +196,7 @@ throw new IOException(ie); } - // 5. Set table enabled flag up in zk. + // 4. Set table enabled flag up in zk. try { assignmentManager.getZKTable(). setEnabledTable(this.hTableDescriptor.getNameAsString()); @@ -189,4 +205,27 @@ " enabled because of a ZooKeeper issue", e); } } + + protected ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + final String threadNamePrefix, int regionNumber) { + int maxThreads = Math.min(regionNumber, conf.getInt( + HConstants.HREGION_OPEN_AND_INITIALIZE_THREADS_MAX, + HConstants.DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX)); + return getRegionOpenAndInitializedThreadPool(maxThreads, threadNamePrefix); + } + + private ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + int maxThreads, final String threadNamePrefix) { + ThreadPoolExecutor openAndInitializeThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + return t; + } + }); + return openAndInitializeThreadPool; + } } \ No newline at end of file