.../apache/hadoop/hbase/util/ByteBufferArray.java | 82 ++++++++++++++++++++-- 1 file changed, 76 insertions(+), 6 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2bb820e..315ae7b 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,13 +71,74 @@ public final class ByteBufferArray { + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; - for (int i = 0; i <= bufferCount; i++) { - if (i < bufferCount) { - buffers[i] = allocator.allocate(bufferSize, directByteBuffer); - } else { - // always create on heap - buffers[i] = ByteBuffer.allocate(0); + // should we make this configurable?? + int coreThreadCount = Runtime.getRuntime().availableProcessors(); + ExecutorService service = new ThreadPoolExecutor(coreThreadCount, coreThreadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + int perThreadCount = (bufferCount) / coreThreadCount; + int remaining = (bufferCount) % coreThreadCount; + try { + List> futures = new ArrayList>(bufferCount); + for (int i = 0; i < coreThreadCount; i++) { + int startIndex, endIndex = -1; + if (i < coreThreadCount) { + startIndex = i * perThreadCount; + endIndex = perThreadCount + startIndex; + } else { + startIndex = endIndex; + endIndex = startIndex + remaining; + } + futures.add(service.submit(new BufferCreatorCallable(allocator, bufferSize, + directByteBuffer, startIndex, endIndex))); + } + // always create last buffer on heap + buffers[bufferCount] = ByteBuffer.allocate(0); + + if (!futures.isEmpty()) { + for (int i = 0; i < coreThreadCount; i++) { + try { + futures.get(i).get(); + } catch (InterruptedException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } catch (ExecutionException e) { + LOG.error("Error while creating buffers", e); + throw new IOException(e); + } + } + } + } finally { + // shutdown the service + service.shutdown(); + } + } + + /** + * A callable that creates buffers of the specified length either onheap/offheap using the + * {@link ByteBufferAllocator} + */ + private class BufferCreatorCallable implements Callable { + final long length; + final ByteBufferAllocator allocator; + final boolean directByteBuffer; + final int startIndex; + final int endIndex; + + BufferCreatorCallable(ByteBufferAllocator allocator, long length, boolean directByteBuffer, + int startIndex, int endIndex) { + this.length = length; + this.allocator = allocator; + this.directByteBuffer = directByteBuffer; + this.startIndex = startIndex; + this.endIndex = endIndex; + } + + @Override + public Void call() throws Exception { + for (int i = startIndex; i < endIndex; i++) { + buffers[i] = allocator.allocate(length, directByteBuffer); } + return null; } }