### Eclipse Workspace Patch 1.0 #P ApacheBranch92 Index: src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1366990) +++ src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -19,15 +19,18 @@ */ package org.apache.hadoop.hbase.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import java.io.PrintWriter; -import org.apache.hadoop.util.ReflectionUtils; - import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ReflectionUtils; + /** * Thread Utility */ @@ -174,4 +177,50 @@ Thread.currentThread().interrupt(); } } + + /** + * Create a new CachedThreadPool with a bounded number as the maximum thread + * size in the pool. + * + * @param maxCachedThread + * the maximum thread could be created in the pool + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the timeout argument + * @param threadFactory + * the factory to use when creating new threads + * @return threadPoolExecutor the cachedThreadPool with a bounded number as + * the maximum thread size in the pool. + */ + public static ThreadPoolExecutor getBoundedCachedThreadPool( + int maxCachedThread, long timeout, TimeUnit unit, + ThreadFactory threadFactory) { + ThreadPoolExecutor boundedCachedThreadPool = new ThreadPoolExecutor( + maxCachedThread, maxCachedThread, timeout, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + // allow the core pool threads timeout and terminate + boundedCachedThreadPool.allowCoreThreadTimeOut(true); + return boundedCachedThreadPool; + } + + /** + * Returns a {@link java.util.concurrent.ThreadFactory} that names each + * created thread uniquely, with a common prefix. + * + * @param prefix + * The prefix of every created Thread's name + * @return a {@link java.util.concurrent.ThreadFactory} that names threads + */ + public static ThreadFactory getNamedThreadFactory(final String prefix) { + return new ThreadFactory() { + + private final AtomicInteger threadNumber = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, prefix + threadNumber.getAndIncrement()); + } + }; + } } Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1366990) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -19,15 +19,15 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.util.Bytes; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.regex.Pattern; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; + /** * HConstants holds a bunch of HBase-related constants */ @@ -208,7 +208,18 @@ /** Configuration key storing the cluster ID */ public static final String CLUSTER_ID = "hbase.cluster.id"; + /** + * The max number of threads used for opening and initialize regions in + * parallel + */ + public static final String HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = "hbase.hregion.open.and.initialize.threads.max"; + /** + * The default number for the max number of threads used for opening and + * initializing regions in parallel + */ + public static final int DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = 10; + // Always store the location of the root table's HRegion. // This HRegion is never split. Index: src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (revision 1366990) +++ src/main/java/org/apache/hadoop/hbase/master/handler/CreateTableHandler.java (working copy) @@ -23,10 +23,19 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; @@ -41,8 +50,8 @@ import org.apache.hadoop.hbase.master.MasterFileSystem; import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.Threads; import org.apache.zookeeper.KeeperException; /** @@ -123,7 +132,7 @@ String tableName = this.hTableDescriptor.getNameAsString(); try { LOG.info("Attemping to create the table " + tableName); - handleCreateTable(); + handleCreateTable(tableName); } catch (IOException e) { LOG.error("Error trying to create the table " + tableName, e); } catch (KeeperException e) { @@ -131,44 +140,51 @@ } } - private void handleCreateTable() throws IOException, KeeperException { - + private void handleCreateTable(String tableName) throws IOException, + KeeperException { + int regionNumber = newRegions.length; + ThreadPoolExecutor regionOpenAndInitializedThreadPool = getRegionOpenAndInitializedThreadPool( + "RegionOpenAndInitlizeThread-" + tableName, regionNumber); + CompletionService completionService = new ExecutorCompletionService( + regionOpenAndInitializedThreadPool); // TODO: Currently we make the table descriptor and as side-effect the // tableDir is created. Should we change below method to be createTable // where we create table in tmp dir with its table descriptor file and then // do rename to move it into place? FSTableDescriptors.createTableDescriptor(this.hTableDescriptor, this.conf); - List regionInfos = new ArrayList(); - final int batchSize = - this.conf.getInt("hbase.master.createtable.batchsize", 100); - HLog hlog = null; - for (int regionIdx = 0; regionIdx < this.newRegions.length; regionIdx++) { - HRegionInfo newRegion = this.newRegions[regionIdx]; - // 1. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - this.fileSystemManager.getRootDir(), this.conf, - this.hTableDescriptor, hlog); - if (hlog == null) { - hlog = region.getLog(); - } + for (final HRegionInfo newRegion : newRegions) { + completionService.submit(new Callable() { + public HRegion call() throws IOException { - regionInfos.add(region.getRegionInfo()); - if (regionIdx % batchSize == 0) { - // 2. Insert into META - MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); - regionInfos.clear(); - } + // 1. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + fileSystemManager.getRootDir(), conf, hTableDescriptor, null); - // 3. Close the new region to flush to disk. Close log file too. - region.close(); + // 2. Close the new region to flush to disk. Close log file too. + region.close(); + return region; + } + }); } - hlog.closeAndDelete(); + try { + for (int i = 0; i < regionNumber; i++) { + Future future = completionService.take(); + HRegion region = future.get(); + regionInfos.add(region.getRegionInfo()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + regionOpenAndInitializedThreadPool.shutdownNow(); + } if (regionInfos.size() > 0) { MetaEditor.addRegionsToMeta(this.catalogTracker, regionInfos); } - // 4. Trigger immediate assignment of the regions in round-robin fashion + // 3. Trigger immediate assignment of the regions in round-robin fashion List servers = serverManager.getOnlineServersList(); // Remove the deadNotExpired servers from the server list. assignmentManager.removeDeadNotExpiredServers(servers); @@ -180,7 +196,7 @@ throw new IOException(ie); } - // 5. Set table enabled flag up in zk. + // 4. Set table enabled flag up in zk. try { assignmentManager.getZKTable(). setEnabledTable(this.hTableDescriptor.getNameAsString()); @@ -189,4 +205,27 @@ " enabled because of a ZooKeeper issue", e); } } + + protected ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + final String threadNamePrefix, int regionNumber) { + int maxThreads = Math.min(regionNumber, conf.getInt( + HConstants.HREGION_OPEN_AND_INITIALIZE_THREADS_MAX, + HConstants.DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX)); + return getRegionOpenAndInitializedThreadPool(maxThreads, threadNamePrefix); + } + + private ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + int maxThreads, final String threadNamePrefix) { + ThreadPoolExecutor openAndInitializeThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + return t; + } + }); + return openAndInitializeThreadPool; + } } \ No newline at end of file