.../hadoop/hbase/regionserver/ChunkCreator.java | 71 +++++++++++++++++++--- 1 file changed, 61 insertions(+), 10 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 7e5395c..c54b4b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -18,15 +18,21 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.ref.WeakReference; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -35,11 +41,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; /** * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated @@ -87,7 +92,11 @@ public class ChunkCreator { float initialCountPercentage, HeapMemoryManager heapMemoryManager) { this.chunkSize = chunkSize; this.offheap = offheap; - this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + try { + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + } catch (Exception e) { + throw new RuntimeException(e); + } if (heapMemoryManager != null && this.pool != null) { // Register with Heap Memory manager heapMemoryManager.registerTuneObserver(this.pool); @@ -257,16 +266,34 @@ public class ChunkCreator { private final AtomicLong chunkCount = new AtomicLong(); private final AtomicLong reusedChunkCount = new AtomicLong(); - MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) throws IOException { this.maxCount = maxCount; this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - Chunk chunk = createChunk(true); - chunk.init(); - reclaimedChunks.add(chunk); + int threadCount = getThreadCount(); + ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + int perThreadCount = (int)Math.floor((double) (initialCount) / threadCount); + int lastThreadCount = initialCount - (perThreadCount * (threadCount - 1)); + Future[] futures = new Future[threadCount]; + try { + for (int i = 0; i < threadCount; i++) { + int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; + futures[i] = service.submit(new ChunkCreatorCallable(buffersToCreate)); + } + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } + } + } finally { + service.shutdownNow(); } chunkCount.set(initialCount); + LOG.info("The number of initial reclaimed chunks available "+reclaimedChunks.size()); final String n = Thread.currentThread().getName(); scheduleThreadPool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat(n + "-MemStoreChunkPool Statistics").setDaemon(true).build()); @@ -274,6 +301,30 @@ public class ChunkCreator { statThreadPeriod, TimeUnit.SECONDS); } + private class ChunkCreatorCallable implements Callable { + private final int bufferCount; + + ChunkCreatorCallable(int bufferCount) { + this.bufferCount = bufferCount; + } + + @Override + public Void call() throws Exception { + Chunk[] chunks = new Chunk[this.bufferCount]; + for (int i = 0; i < this.bufferCount; i++) { + chunks[i] = createChunk(true); + chunks[i].init(); + reclaimedChunks.add(chunks[i]); + } + return null; + } + } + + @VisibleForTesting + int getThreadCount() { + return Runtime.getRuntime().availableProcessors(); + } + /** * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have * not yet created max allowed chunks count. When we have already created max allowed chunks and @@ -393,7 +444,7 @@ public class ChunkCreator { } private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage) { + float initialCountPercentage) throws IOException { if (poolSizePercentage <= 0) { LOG.info("PoolSizePercentage is less than 0. So not using pool"); return null;