.../hadoop/hbase/regionserver/ChunkCreator.java | 71 ++++++++++++++++++---- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java index 4dd1207..42dd075 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ChunkCreator.java @@ -18,13 +18,19 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.Map; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -34,11 +40,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.regionserver.HeapMemoryManager.HeapMemoryTuneObserver; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.util.StringUtils; - import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.StringUtils; /** * Does the management of memstoreLAB chunk creations. A monotonically incrementing id is associated @@ -73,7 +78,11 @@ public class ChunkCreator { float initialCountPercentage, HeapMemoryManager heapMemoryManager) { this.chunkSize = chunkSize; this.offheap = offheap; - this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + try { + this.pool = initializePool(globalMemStoreSize, poolSizePercentage, initialCountPercentage); + } catch (Exception e) { + throw new RuntimeException(e); + } if (heapMemoryManager != null && this.pool != null) { // Register with Heap Memory manager heapMemoryManager.registerTuneObserver(this.pool); @@ -222,16 +231,31 @@ public class ChunkCreator { private final AtomicLong chunkCount = new AtomicLong(); private final LongAdder reusedChunkCount = new LongAdder(); - MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) { + MemStoreChunkPool(int maxCount, int initialCount, float poolSizePercentage) throws IOException { this.maxCount = maxCount; this.poolSizePercentage = poolSizePercentage; this.reclaimedChunks = new LinkedBlockingQueue<>(); - for (int i = 0; i < initialCount; i++) { - // Chunks from pool are covered with strong references anyway - // TODO: change to CHUNK_MAP if it is generally defined - Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP); - chunk.init(); - reclaimedChunks.add(chunk); + int threadCount = getThreadCount(); + ExecutorService service = new ThreadPoolExecutor(threadCount, threadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + int perThreadCount = (int) Math.floor((double) (initialCount) / threadCount); + int lastThreadCount = initialCount - (perThreadCount * (threadCount - 1)); + Future[] futures = new Future[threadCount]; + try { + for (int i = 0; i < threadCount; i++) { + int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; + futures[i] = service.submit(new ChunkCreatorCallable(buffersToCreate)); + } + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } + } + } finally { + service.shutdownNow(); } chunkCount.set(initialCount); final String n = Thread.currentThread().getName(); @@ -241,6 +265,29 @@ public class ChunkCreator { statThreadPeriod, TimeUnit.SECONDS); } + private class ChunkCreatorCallable implements Callable { + private final int bufferCount; + + ChunkCreatorCallable(int bufferCount) { + this.bufferCount = bufferCount; + } + + @Override + public Void call() throws Exception { + for (int i = 0; i < this.bufferCount; i++) { + Chunk chunk = createChunk(true, CompactingMemStore.IndexType.ARRAY_MAP); + chunk.init(); + reclaimedChunks.add(chunk); + } + return null; + } + } + + @VisibleForTesting + int getThreadCount() { + return Runtime.getRuntime().availableProcessors(); + } + /** * Poll a chunk from the pool, reset it if not null, else create a new chunk to return if we have * not yet created max allowed chunks count. When we have already created max allowed chunks and @@ -356,7 +403,7 @@ public class ChunkCreator { } private MemStoreChunkPool initializePool(long globalMemStoreSize, float poolSizePercentage, - float initialCountPercentage) { + float initialCountPercentage) throws IOException { if (poolSizePercentage <= 0) { LOG.info("PoolSizePercentage is less than 0. So not using pool"); return null;