.../apache/hadoop/hbase/util/ByteBufferArray.java | 75 ++++++++++++++++++++-- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 2 +- .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 2 +- 3 files changed, 71 insertions(+), 8 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2bb820e..fdb4083 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -29,6 +38,8 @@ import org.apache.hadoop.hbase.nio.MultiByteBuff; import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.util.StringUtils; +import sun.misc.VM; + /** * This class manages an array of ByteBuffers with a default size 4MB. These * buffers are sequential and could be considered as a large buffer.It supports @@ -61,14 +72,66 @@ public final class ByteBufferArray { LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); + if (directByteBuffer && capacity > VM.maxDirectMemory()) { + String msg = "Cannot create bytebuffers as the required memory= " + capacity + + " is greater than the maxDirect memory= " + VM.maxDirectMemory(); + LOG.error(msg); + throw new RuntimeException(msg); + } buffers = new ByteBuffer[bufferCount + 1]; - for (int i = 0; i <= bufferCount; i++) { - if (i < bufferCount) { - buffers[i] = allocator.allocate(bufferSize, directByteBuffer); - } else { - // always create on heap - buffers[i] = ByteBuffer.allocate(0); + // should we make this configurable?? + int coreThreadCount = Runtime.getRuntime().availableProcessors(); + ExecutorService service = new ThreadPoolExecutor(coreThreadCount, coreThreadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + try { + List> futures = new ArrayList>(bufferCount); + for (int i = 0; i <= bufferCount; i++) { + if (i < bufferCount) { + futures.add( + service.submit(new BufferCreatorCallable(allocator, bufferSize, directByteBuffer))); + } else { + // always create on heap + buffers[i] = ByteBuffer.allocate(0); + } + } + + if (!futures.isEmpty()) { + for (int i = 0; i < bufferCount; i++) { + try { + buffers[i] = futures.get(i).get(); + } catch (InterruptedException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } catch (ExecutionException e) { + LOG.error("Error while creating buffers", e); + throw new IOException(e); + } + } } + } finally { + // shutdown the service + service.shutdown(); + } + } + + /** + * A callable that creates buffers of the specified length either onheap/offheap + * using the {@link ByteBufferAllocator} + */ + private static class BufferCreatorCallable implements Callable { + final long length; + final ByteBufferAllocator allocator; + final boolean directByteBuffer; + + BufferCreatorCallable(ByteBufferAllocator allocator, long length, boolean directByteBuffer) { + this.length = length; + this.allocator = allocator; + this.directByteBuffer = directByteBuffer; + } + + @Override + public ByteBuffer call() throws Exception { + return allocator.allocate(length, directByteBuffer); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 3c27f14..1c96e2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -329,7 +329,7 @@ public class BucketCache implements BlockCache, HeapSize { return new FileMmapEngine(ioEngineName.substring(5), capacity); } else { throw new IllegalArgumentException( - "Don't understand io engine name for cache - prefix with file:, heap or offheap"); + "Don't understand io engine name for cache - prefix with file:,mmap:, heap or offheap"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 63de32c..92360d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -23,8 +23,8 @@ import java.nio.ByteBuffer; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.hfile.Cacheable; -import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferAllocator; import org.apache.hadoop.hbase.util.ByteBufferArray;