### Eclipse Workspace Patch 1.0 #P Apache90branch Index: src/main/java/org/apache/hadoop/hbase/master/HMaster.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/master/HMaster.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/master/HMaster.java (working copy) @@ -28,6 +28,14 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; @@ -43,8 +51,8 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.PleaseHoldException; -import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableNotDisabledException; @@ -835,27 +843,52 @@ if(MetaReader.tableExists(catalogTracker, tableName)) { throw new TableExistsException(tableName); } - for (HRegionInfo newRegion : newRegions) { - // 1. Set table enabling flag up in zk. - try { - assignmentManager.getZKTable().setEnabledTable(tableName); - } catch (KeeperException e) { - throw new IOException("Unable to ensure that the table will be" + - " enabled because of a ZooKeeper issue", e); - } + int regionNumber = newRegions.length; + ThreadPoolExecutor regionOpenAndInitializedThreadPool = getRegionOpenAndInitializedThreadPool( + "RegionOpenAndInitlizeThread-" + tableName, regionNumber); + CompletionService completionService = new ExecutorCompletionService( + regionOpenAndInitializedThreadPool); + // 1. Set table enabling flag up in zk. + try { + assignmentManager.getZKTable().setEnablingTable(tableName); + } catch (KeeperException e) { + throw new IOException("Unable to ensure that the table will be" + + " enabled because of a ZooKeeper issue", e); + } + List regionInfos = new ArrayList(); + for (final HRegionInfo newRegion : newRegions) { - // 2. Create HRegion - HRegion region = HRegion.createHRegion(newRegion, - fileSystemManager.getRootDir(), conf); + completionService.submit(new Callable() { + public HRegion call() throws IOException { - // 3. Insert into META - MetaEditor.addRegionToMeta(catalogTracker, region.getRegionInfo()); + // 2. Create HRegion + HRegion region = HRegion.createHRegion(newRegion, + fileSystemManager.getRootDir(), conf); - // 4. Close the new region to flush to disk. Close log file too. - region.close(); - region.getLog().closeAndDelete(); + // 3. Close the new region to flush to disk. Close log file too. + region.close(); + region.getLog().closeAndDelete(); + return region; + } + }); } + try { + for (int i = 0; i < regionNumber; i++) { + Future future = completionService.take(); + HRegion region = future.get(); + regionInfos.add(region.getRegionInfo()); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + regionOpenAndInitializedThreadPool.shutdownNow(); + } + // 4. Insert into META + MetaEditor.addRegionsToMeta(catalogTracker, regionInfos); + // 5. Trigger immediate assignment of the regions in round-robin fashion if (newRegions.length == 1) { this.assignmentManager.assign(newRegions[0], true); @@ -880,6 +913,29 @@ } } + protected ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + final String threadNamePrefix, int regionNumber) { + int maxThreads = Math.min(regionNumber, conf.getInt( + HConstants.HREGION_OPEN_AND_INITIALIZE_THREADS_MAX, + HConstants.DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX)); + return getRegionOpenAndInitializedThreadPool(maxThreads, threadNamePrefix); + } + + private ThreadPoolExecutor getRegionOpenAndInitializedThreadPool( + int maxThreads, final String threadNamePrefix) { + ThreadPoolExecutor openAndInitializeThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + return t; + } + }); + return openAndInitializeThreadPool; + } + private static boolean isCatalogTable(final byte [] tableName) { return Bytes.equals(tableName, HConstants.ROOT_TABLE_NAME) || Bytes.equals(tableName, HConstants.META_TABLE_NAME); Index: src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/catalog/MetaEditor.java (working copy) @@ -21,6 +21,8 @@ import java.io.IOException; import java.net.ConnectException; +import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -60,6 +62,35 @@ } /** + * Adds a META row for each of the specified new regions. + * + * @param catalogTracker + * CatalogTracker + * @param regionInfos + * region information list + * @throws IOException + * if problem connecting or updating meta + */ + public static void addRegionsToMeta(CatalogTracker catalogTracker, + List regionInfos) throws IOException { + List puts = new ArrayList(); + for (HRegionInfo regionInfo : regionInfos) { + puts.add(makePutFromRegionInfo(regionInfo)); + } + catalogTracker.waitForMetaServerConnectionDefault().put( + CatalogTracker.META_REGION, puts); + LOG.info("Added " + puts.size() + " regions in META"); + } + + private static Put makePutFromRegionInfo(HRegionInfo regionInfo) + throws IOException { + Put put = new Put(regionInfo.getRegionName()); + put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, + Writables.getBytes(regionInfo)); + return put; + } + + /** * Offline parent in meta. * Used when splitting. * @param catalogTracker Index: src/main/java/org/apache/hadoop/hbase/HConstants.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/HConstants.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/HConstants.java (working copy) @@ -171,6 +171,18 @@ /** Maximum value length, enforced on KeyValue construction */ public static final int MAXIMUM_VALUE_LENGTH = Integer.MAX_VALUE; + /** + * The max number of threads used for opening and initialize regions in + * parallel + */ + public static final String HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = "hbase.hregion.open.and.initialize.threads.max"; + + /** + * The default number for the max number of threads used for opening and + * initializing regions in parallel + */ + public static final int DEFAULT_HREGION_OPEN_AND_INITIALIZE_THREADS_MAX = 1; + // Always store the location of the root table's HRegion. // This HRegion is never split. Index: src/main/java/org/apache/hadoop/hbase/util/Threads.java =================================================================== --- src/main/java/org/apache/hadoop/hbase/util/Threads.java (revision 1366989) +++ src/main/java/org/apache/hadoop/hbase/util/Threads.java (working copy) @@ -19,13 +19,17 @@ */ package org.apache.hadoop.hbase.util; +import java.io.PrintWriter; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import java.io.PrintWriter; import org.apache.hadoop.util.ReflectionUtils; -import java.lang.Thread.UncaughtExceptionHandler; - /** * Thread Utility */ @@ -128,4 +132,30 @@ e.printStackTrace(); } } + + /** + * Create a new CachedThreadPool with a bounded number as the maximum thread + * size in the pool. + * + * @param maxCachedThread + * the maximum thread could be created in the pool + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the timeout argument + * @param threadFactory + * the factory to use when creating new threads + * @return threadPoolExecutor the cachedThreadPool with a bounded number as + * the maximum thread size in the pool. + */ + public static ThreadPoolExecutor getBoundedCachedThreadPool( + int maxCachedThread, long timeout, TimeUnit unit, + ThreadFactory threadFactory) { + ThreadPoolExecutor boundedCachedThreadPool = new ThreadPoolExecutor( + maxCachedThread, maxCachedThread, timeout, TimeUnit.SECONDS, + new LinkedBlockingQueue(), threadFactory); + // allow the core pool threads timeout and terminate + boundedCachedThreadPool.allowCoreThreadTimeOut(true); + return boundedCachedThreadPool; + } }