.../apache/hadoop/hbase/util/ByteBufferArray.java | 66 ++++++++++++++++++++-- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2bb820e..f318ce6 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,13 +71,58 @@ public final class ByteBufferArray { + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; - for (int i = 0; i <= bufferCount; i++) { - if (i < bufferCount) { - buffers[i] = allocator.allocate(bufferSize, directByteBuffer); - } else { - // always create on heap - buffers[i] = ByteBuffer.allocate(0); + // should we make this configurable?? + ExecutorService service = new ThreadPoolExecutor(10, (int) (bufferCount * 0.25), 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + try { + List> futures = new ArrayList>(bufferCount - 1); + for (int i = 0; i <= bufferCount; i++) { + if (i < bufferCount) { + futures.add( + service.submit(new BufferCreatorCallable(allocator, bufferSize, directByteBuffer))); + } else { + // always create on heap + buffers[i] = ByteBuffer.allocate(0); + } + } + + if (!futures.isEmpty()) { + for (int i = 0; i < bufferCount; i++) { + try { + buffers[i] = futures.get(i).get(); + } catch (InterruptedException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } catch (ExecutionException e) { + LOG.error("Error while creating buffers", e); + throw new IOException(e); + } + } } + } finally { + // shutdown the service + service.shutdown(); + } + } + + /** + * A callable that creates buffers of the specified length either onheap/offheap + * using the {@link ByteBufferAllocator} + */ + private static class BufferCreatorCallable implements Callable { + final long length; + final ByteBufferAllocator allocator; + final boolean directByteBuffer; + + BufferCreatorCallable(ByteBufferAllocator allocator, long length, boolean directByteBuffer) { + this.length = length; + this.allocator = allocator; + this.directByteBuffer = directByteBuffer; + } + + @Override + public ByteBuffer call() throws Exception { + return allocator.allocate(length, directByteBuffer); } }