From f9723928bbd2b2cfd87341e5858e1261ab83b913 Mon Sep 17 00:00:00 2001 From: liyintang Date: Wed, 28 Dec 2011 12:56:05 -0800 Subject: [PATCH] [HBASE-5033] Opening/Closing store in parallel to reduce region open/close time --- .../java/org/apache/hadoop/hbase/HConstants.java | 19 +++- .../apache/hadoop/hbase/regionserver/HRegion.java | 134 +++++++++++++++++--- .../apache/hadoop/hbase/regionserver/Store.java | 107 +++++++++++++--- .../java/org/apache/hadoop/hbase/util/Threads.java | 32 ++++- 4 files changed, 250 insertions(+), 42 deletions(-) diff --git src/main/java/org/apache/hadoop/hbase/HConstants.java src/main/java/org/apache/hadoop/hbase/HConstants.java index 5120a3c..904e2d2 100644 --- src/main/java/org/apache/hadoop/hbase/HConstants.java +++ src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -19,15 +19,15 @@ */ package org.apache.hadoop.hbase; -import org.apache.hadoop.hbase.ipc.HRegionInterface; -import org.apache.hadoop.hbase.util.Bytes; - import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.regex.Pattern; +import org.apache.hadoop.hbase.ipc.HRegionInterface; +import org.apache.hadoop.hbase.util.Bytes; + /** * HConstants holds a bunch of HBase-related constants */ @@ -218,6 +218,19 @@ public final class HConstants { public static final String HREGION_MAX_FILESIZE = "hbase.hregion.max.filesize"; + /** + * The max number of threads used for opening and closing stores or store + * files in parallel + */ + public static final String HSTORE_OPEN_AND_CLOSE_THREADS_MAX = + "hbase.hstore.open.and.close.threads.max"; + + /** + * The default number for the max number of threads used for opening and + * closing stores or store files in parallel + */ + public static final int DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX = 1; + /** Default maximum file size */ public static final long DEFAULT_MAX_FILE_SIZE = 256 * 1024 * 1024; diff --git src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 354ae4e..48f6d77 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -40,10 +40,17 @@ import java.util.NavigableSet; import java.util.Random; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -106,6 +113,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.Writable; @@ -114,6 +122,7 @@ import org.cliffc.high_scale_lib.Counter; import com.google.common.base.Preconditions; import com.google.common.collect.ClassToInstanceMap; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.MutableClassToInstanceMap; @@ -433,7 +442,7 @@ public class HRegion implements HeapSize { // , Writable{ * @see HRegion#newHRegion(Path, HLog, FileSystem, Configuration, HRegionInfo, HTableDescriptor, RegionServerServices) */ public HRegion(Path tableDir, HLog log, FileSystem fs, Configuration conf, - HRegionInfo regionInfo, final HTableDescriptor htd, + final HRegionInfo regionInfo, final HTableDescriptor htd, RegionServerServices rsServices) { this.tableDir = tableDir; this.comparator = regionInfo.getComparator(); @@ -542,20 +551,49 @@ public class HRegion implements HeapSize { // , Writable{ long maxSeqId = -1; // initialized to -1 so that we pick up MemstoreTS from column families long maxMemstoreTS = -1; - for (HColumnDescriptor c : this.htableDescriptor.getFamilies()) { - status.setStatus("Instantiating store for column family " + c); - Store store = instantiateHStore(this.tableDir, c); - this.stores.put(c.getName(), store); - long storeSeqId = store.getMaxSequenceId(); - if (minSeqId == -1 || storeSeqId < minSeqId) { - minSeqId = storeSeqId; - } - if (maxSeqId == -1 || storeSeqId > maxSeqId) { - maxSeqId = storeSeqId; + + if (this.htableDescriptor != null && + !htableDescriptor.getFamilies().isEmpty()) { + // initialize the thread pool for opening stores in parallel. + ThreadPoolExecutor storeOpenerThreadPool = + getStoreOpenAndCloseThreadPool( + "StoreOpenerThread-" + this.regionInfo.getRegionNameAsString()); + CompletionService completionService = + new ExecutorCompletionService(storeOpenerThreadPool); + + // initialize each store in parallel + for (final HColumnDescriptor family : htableDescriptor.getFamilies()) { + status.setStatus("Instantiating store for column family " + family); + completionService.submit(new Callable() { + public Store call() throws IOException { + return instantiateHStore(tableDir, family); + } + }); } - long maxStoreMemstoreTS = store.getMaxMemstoreTS(); - if (maxStoreMemstoreTS > maxMemstoreTS) { - maxMemstoreTS = maxStoreMemstoreTS; + try { + for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) { + Future future = completionService.take(); + Store store = future.get(); + + this.stores.put(store.getColumnFamilyName().getBytes(), store); + long storeSeqId = store.getMaxSequenceId(); + if (minSeqId == -1 || storeSeqId < minSeqId) { + minSeqId = storeSeqId; + } + if (maxSeqId == -1 || storeSeqId > maxSeqId) { + maxSeqId = storeSeqId; + } + long maxStoreMemstoreTS = store.getMaxMemstoreTS(); + if (maxStoreMemstoreTS > maxMemstoreTS) { + maxMemstoreTS = maxStoreMemstoreTS; + } + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + storeOpenerThreadPool.shutdownNow(); } } mvcc.initialize(maxMemstoreTS + 1); @@ -883,8 +921,38 @@ public class HRegion implements HeapSize { // , Writable{ } List result = new ArrayList(); - for (Store store : stores.values()) { - result.addAll(store.close()); + if (!stores.isEmpty()) { + // initialize the thread pool for closing stores in parallel. + ThreadPoolExecutor storeCloserThreadPool = + getStoreOpenAndCloseThreadPool("StoreCloserThread-" + + this.regionInfo.getRegionNameAsString()); + CompletionService> completionService = + new ExecutorCompletionService>( + storeCloserThreadPool); + + // close each store in parallel + for (final Store store : stores.values()) { + completionService + .submit(new Callable>() { + public ImmutableList call() throws IOException { + return store.close(); + } + }); + } + try { + for (int i = 0; i < stores.size(); i++) { + Future> future = completionService + .take(); + ImmutableList storeFileList = future.get(); + result.addAll(storeFileList); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + storeCloserThreadPool.shutdownNow(); + } } this.closed.set(true); @@ -900,6 +968,40 @@ public class HRegion implements HeapSize { // , Writable{ } } + protected ThreadPoolExecutor getStoreOpenAndCloseThreadPool( + final String threadNamePrefix) { + int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); + int maxThreads = Math.min(numStores, + conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, + HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX)); + return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); + } + + protected ThreadPoolExecutor getStoreFileOpenAndCloseThreadPool( + final String threadNamePrefix) { + int numStores = Math.max(1, this.htableDescriptor.getFamilies().size()); + int maxThreads = Math.max(1, + conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX, + HConstants.DEFAULT_HSTORE_OPEN_AND_CLOSE_THREADS_MAX) + / numStores); + return getOpenAndCloseThreadPool(maxThreads, threadNamePrefix); + } + + private ThreadPoolExecutor getOpenAndCloseThreadPool(int maxThreads, + final String threadNamePrefix) { + ThreadPoolExecutor openAndCloseThreadPool = Threads + .getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS, + new ThreadFactory() { + private int count = 1; + + public Thread newThread(Runnable r) { + Thread t = new Thread(r, threadNamePrefix + "-" + count++); + return t; + } + }); + return openAndCloseThreadPool; + } + /** * @return True if its worth doing a flush before we put up the close flag. */ diff --git src/main/java/org/apache/hadoop/hbase/regionserver/Store.java src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index b928731..636e533 100644 --- src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -27,7 +27,13 @@ import java.util.Collections; import java.util.List; import java.util.NavigableSet; import java.util.SortedSet; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -271,38 +277,73 @@ public class Store extends SchemaConfigured implements HeapSize { return homedir; } - /* - * Creates an unsorted list of StoreFile loaded from the given directory. + /** + * Creates an unsorted list of StoreFile loaded in parallel + * from the given directory. * @throws IOException */ - private List loadStoreFiles() - throws IOException { + private List loadStoreFiles() throws IOException { ArrayList results = new ArrayList(); FileStatus files[] = FSUtils.listStatus(this.fs, this.homedir, null); - for (int i = 0; files != null && i < files.length; i++) { + + if (files == null || files.length == 0) { + return results; + } + // initialize the thread pool for opening store files in parallel.. + ThreadPoolExecutor storeFileOpenerThreadPool = + this.region.getStoreFileOpenAndCloseThreadPool("StoreFileOpenerThread-" + + this.family.getNameAsString()); + CompletionService completionService = + new ExecutorCompletionService(storeFileOpenerThreadPool); + + int totalValidStoreFile = 0; + for (int i = 0; i < files.length; i++) { // Skip directories. if (files[i].isDir()) { continue; } - Path p = files[i].getPath(); - // Check for empty file. Should never be the case but can happen + final Path p = files[i].getPath(); + // Check for empty file. Should never be the case but can happen // after data loss in hdfs for whatever reason (upgrade, etc.): HBASE-646 if (this.fs.getFileStatus(p).getLen() <= 0) { LOG.warn("Skipping " + p + " because its empty. HBASE-646 DATA LOSS?"); continue; } - StoreFile curfile = new StoreFile(fs, p, this.conf, this.cacheConf, - this.family.getBloomFilterType()); - passSchemaMetricsTo(curfile); - curfile.createReader(); - long length = curfile.getReader().length(); - this.storeSize += length; - this.totalUncompressedBytes += curfile.getReader().getTotalUncompressedBytes(); - if (LOG.isDebugEnabled()) { - LOG.debug("loaded " + curfile.toStringDetailed()); + + // open each store file in parallel + completionService.submit(new Callable() { + public StoreFile call() throws IOException { + StoreFile storeFile = new StoreFile(fs, p, conf, cacheConf, + family.getBloomFilterType()); + passSchemaMetricsTo(storeFile); + storeFile.createReader(); + return storeFile; + } + }); + totalValidStoreFile++; + } + + try { + for (int i = 0; i < totalValidStoreFile; i++) { + Future future = completionService.take(); + StoreFile storeFile = future.get(); + long length = storeFile.getReader().length(); + this.storeSize += length; + this.totalUncompressedBytes += + storeFile.getReader().getTotalUncompressedBytes(); + if (LOG.isDebugEnabled()) { + LOG.debug("loaded " + storeFile.toStringDetailed()); + } + results.add(storeFile); } - results.add(curfile); + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + storeFileOpenerThreadPool.shutdownNow(); } + return results; } @@ -499,8 +540,36 @@ public class Store extends SchemaConfigured implements HeapSize { // Clear so metrics doesn't find them. storefiles = ImmutableList.of(); - for (StoreFile f: result) { - f.closeReader(true); + if (!result.isEmpty()) { + // initialize the thread pool for closing store files in parallel. + ThreadPoolExecutor storeFileCloserThreadPool = this.region + .getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-" + + this.family.getNameAsString()); + + // close each store file in parallel + CompletionService completionService = + new ExecutorCompletionService(storeFileCloserThreadPool); + for (final StoreFile f : result) { + completionService.submit(new Callable() { + public Void call() throws IOException { + f.closeReader(true); + return null; + } + }); + } + + try { + for (int i = 0; i < result.size(); i++) { + Future future = completionService.take(); + future.get(); + } + } catch (InterruptedException e) { + throw new IOException(e); + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + storeFileCloserThreadPool.shutdownNow(); + } } LOG.debug("closed " + this.storeNameStr); return result; diff --git src/main/java/org/apache/hadoop/hbase/util/Threads.java src/main/java/org/apache/hadoop/hbase/util/Threads.java index 6f81b62..ce880a5 100644 --- src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -19,14 +19,17 @@ */ package org.apache.hadoop.hbase.util; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import java.io.PrintWriter; -import org.apache.hadoop.util.ReflectionUtils; - import java.lang.Thread.UncaughtExceptionHandler; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.ReflectionUtils; + /** * Thread Utility */ @@ -152,4 +155,25 @@ public class Threads { } } + /** + * Create a new CachedThreadPool with a bounded number as the maximum + * thread size in the pool. + * + * @param maxCachedThread the maximum thread could be created in the pool + * @param timeout the maximum time to wait + * @param unit the time unit of the timeout argument + * @param threadFactory the factory to use when creating new threads + * @return threadPoolExecutor the cachedThreadPool with a bounded number + * as the maximum thread size in the pool. + */ + public static ThreadPoolExecutor getBoundedCachedThreadPool( + int maxCachedThread, long timeout, TimeUnit unit, + ThreadFactory threadFactory) { + ThreadPoolExecutor boundedCachedThreadPool = + new ThreadPoolExecutor(maxCachedThread, maxCachedThread, timeout, + TimeUnit.SECONDS, new LinkedBlockingQueue(), threadFactory); + // allow the core pool threads timeout and terminate + boundedCachedThreadPool.allowCoreThreadTimeOut(true); + return boundedCachedThreadPool; + } } -- 1.7.4