.../apache/hadoop/hbase/util/ByteBufferArray.java | 69 ++++++++++++++++++++-- .../apache/hadoop/hbase/util/ByteBufferUtils.java | 12 ++++ .../org/apache/hadoop/hbase/util/UnsafeAccess.java | 44 ++++++++++++++ .../hadoop/hbase/util/TestByteBufferUtils.java | 8 +++ .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 2 +- .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 3 +- 6 files changed, 130 insertions(+), 8 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2bb820e..372ff57 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -20,6 +20,15 @@ package org.apache.hadoop.hbase.util; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -62,13 +71,61 @@ public final class ByteBufferArray { + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + bufferCount + ", direct=" + directByteBuffer); buffers = new ByteBuffer[bufferCount + 1]; - for (int i = 0; i <= bufferCount; i++) { - if (i < bufferCount) { - buffers[i] = allocator.allocate(bufferSize, directByteBuffer); - } else { - // always create on heap - buffers[i] = ByteBuffer.allocate(0); + // should we make this configurable?? + int coreThreadCount = 10; + int maxThreadCount = + (int) (bufferCount * 0.25) < coreThreadCount ? coreThreadCount : (int) (bufferCount * 0.25); + ExecutorService service = new ThreadPoolExecutor(coreThreadCount, maxThreadCount, 0L, + TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); + try { + List> futures = new ArrayList>(bufferCount - 1); + for (int i = 0; i <= bufferCount; i++) { + if (i < bufferCount) { + futures.add( + service.submit(new BufferCreatorCallable(allocator, bufferSize, directByteBuffer))); + } else { + // always create on heap + buffers[i] = ByteBuffer.allocate(0); + } + } + + if (!futures.isEmpty()) { + for (int i = 0; i < bufferCount; i++) { + try { + buffers[i] = futures.get(i).get(); + } catch (InterruptedException e) { + LOG.error("Buffer creation interrupted", e); + throw new IOException(e); + } catch (ExecutionException e) { + LOG.error("Error while creating buffers", e); + throw new IOException(e); + } + } } + } finally { + // shutdown the service + service.shutdown(); + } + } + + /** + * A callable that creates buffers of the specified length either onheap/offheap + * using the {@link ByteBufferAllocator} + */ + private static class BufferCreatorCallable implements Callable { + final long length; + final ByteBufferAllocator allocator; + final boolean directByteBuffer; + + BufferCreatorCallable(ByteBufferAllocator allocator, long length, boolean directByteBuffer) { + this.length = length; + this.allocator = allocator; + this.directByteBuffer = directByteBuffer; + } + + @Override + public ByteBuffer call() throws Exception { + return allocator.allocate(length, directByteBuffer); } } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java index 6dac300..3324f20 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferUtils.java @@ -1005,6 +1005,18 @@ public final class ByteBufferUtils { } /** + * Allocates a direct bytebuffer directly using Unsafe if available, if not uses java's nio Buffer + * way of direct byte buffer creation + **/ + public static ByteBuffer allocateDirectBuffer(int length) { + if (UNSAFE_AVAIL) { + return UnsafeAccess.allocate(length); + } else { + return ByteBuffer.allocateDirect(length); + } + } + + /** * Copies specified number of bytes from given offset of 'in' ByteBuffer to * the array. This doesn't affact the position of buffer. * @param out diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java index 9078def..0de19d9 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/UnsafeAccess.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.util; import java.lang.reflect.Field; +import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.security.AccessController; @@ -42,6 +43,11 @@ public final class UnsafeAccess { /** The offset to the first element in a byte array. */ public static final long BYTE_ARRAY_BASE_OFFSET; + private static final Class DIRECT_BYTE_BUFFER_CLASS; + private static final long DIRECT_BYTE_BUFFER_ADDRESS_OFFSET; + private static final long DIRECT_BYTE_BUFFER_CAPACITY_OFFSET; + private static final long DIRECT_BYTE_BUFFER_LIMIT_OFFSET; + static final boolean littleEndian = ByteOrder.nativeOrder() .equals(ByteOrder.LITTLE_ENDIAN); @@ -69,6 +75,20 @@ public final class UnsafeAccess { } else{ BYTE_ARRAY_BASE_OFFSET = -1; } + + try { + ByteBuffer directBuffer = ByteBuffer.allocateDirect(0); + Class clazz = directBuffer.getClass(); + DIRECT_BYTE_BUFFER_ADDRESS_OFFSET = + theUnsafe.objectFieldOffset(Buffer.class.getDeclaredField("address")); + DIRECT_BYTE_BUFFER_CAPACITY_OFFSET = + theUnsafe.objectFieldOffset(Buffer.class.getDeclaredField("capacity")); + DIRECT_BYTE_BUFFER_LIMIT_OFFSET = + theUnsafe.objectFieldOffset(Buffer.class.getDeclaredField("limit")); + DIRECT_BYTE_BUFFER_CLASS = clazz; + } catch (NoSuchFieldException e) { + throw new RuntimeException(e); + } } private UnsafeAccess(){} @@ -103,6 +123,30 @@ public final class UnsafeAccess { } /** + * Creates a bytebuffer using the Unsafe API instead of using NIO's DirectByteBuffer APIs + * Note : Taken from https://github.com/snazy/ohc + * @param size the size of the buffer to be created + * @return the created bytebuffer + * + */ + public static ByteBuffer allocate(int size) { + long address = theUnsafe.allocateMemory(size); + ByteBuffer bb; + try { + bb = (ByteBuffer) theUnsafe.allocateInstance(DIRECT_BYTE_BUFFER_CLASS); + theUnsafe.putLong(bb, DIRECT_BYTE_BUFFER_ADDRESS_OFFSET, address); + theUnsafe.putInt(bb, DIRECT_BYTE_BUFFER_CAPACITY_OFFSET, (int) size); + theUnsafe.putInt(bb, DIRECT_BYTE_BUFFER_LIMIT_OFFSET, (int) size); + bb.order(ByteOrder.BIG_ENDIAN); + return bb; + } catch (Error e) { + throw e; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + /** * Converts a byte array to a long value considering it was written in big-endian format. * @param bytes byte array * @param offset offset into array diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java index ee03c7b..4c032267 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferUtils.java @@ -195,6 +195,14 @@ public class TestByteBufferUtils { } } + @Test + public void testUnsafeAllocate() throws Exception { + ByteBuffer allocate = ByteBufferUtils.allocateDirectBuffer(100); + ByteBufferUtils.putInt(allocate, 100); + int res = ByteBufferUtils.toInt(allocate, 0); + assertEquals("Should get back the same value", res, 100); + } + /** * Test copying to stream from buffer. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 3c27f14..1c96e2a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -329,7 +329,7 @@ public class BucketCache implements BlockCache, HeapSize { return new FileMmapEngine(ioEngineName.substring(5), capacity); } else { throw new IllegalArgumentException( - "Don't understand io engine name for cache - prefix with file:, heap or offheap"); + "Don't understand io engine name for cache - prefix with file:,mmap:, heap or offheap"); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 63de32c..cded18b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.util.ByteBufferAllocator; import org.apache.hadoop.hbase.util.ByteBufferArray; +import org.apache.hadoop.hbase.util.ByteBufferUtils; /** * IO engine that stores data in memory using an array of ByteBuffers @@ -85,7 +86,7 @@ public class ByteBufferIOEngine implements IOEngine { public ByteBuffer allocate(long size, boolean directByteBuffer) throws IOException { if (directByteBuffer) { - return ByteBuffer.allocateDirect((int) size); + return ByteBufferUtils.allocateDirectBuffer((int) size); } else { return ByteBuffer.allocate((int) size); }