diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java index 2c5eac829c..5de932954c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferAllocator.java @@ -30,9 +30,10 @@ public interface ByteBufferAllocator { /** * Allocates a bytebuffer + * @param index The index of this buffer * @param size the size of the bytebuffer * @return the bytebuffer that is created * @throws IOException exception thrown if there is an error while creating the ByteBuffer */ - ByteBuffer allocate(long size) throws IOException; + ByteBuffer allocate(int index, long size) throws IOException; } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2e14b13a2b..f1a82de13c 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -86,9 +86,9 @@ public class ByteBufferArray { try { for (int i = 0; i < threadCount; i++) { // Last thread will have to deal with a different number of buffers - int buffersToCreate = (i == threadCount - 1) ? lastThreadCount : perThreadCount; + int buffersCount = (i == threadCount - 1) ? lastThreadCount : perThreadCount; futures[i] = service.submit( - new BufferCreatorCallable(bufferSize, buffersToCreate, allocator)); + new BufferCreatorCallable(bufferSize, (i * perThreadCount), buffersCount, allocator)); } int bufferIndex = 0; for (Future future : futures) { @@ -120,11 +120,14 @@ public class ByteBufferArray { */ private static class BufferCreatorCallable implements Callable { private final int bufferCapacity; + private final int bufferIndexFrom; private final int bufferCount; private final ByteBufferAllocator allocator; - BufferCreatorCallable(int bufferCapacity, int bufferCount, ByteBufferAllocator allocator) { + BufferCreatorCallable(int bufferCapacity, int bufferIndexFrom, int bufferCount, + ByteBufferAllocator allocator) { this.bufferCapacity = bufferCapacity; + this.bufferIndexFrom = bufferIndexFrom; this.bufferCount = bufferCount; this.allocator = allocator; } @@ -133,7 +136,7 @@ public class ByteBufferArray { public ByteBuffer[] call() throws Exception { ByteBuffer[] buffers = new ByteBuffer[this.bufferCount]; for (int i = 0; i < this.bufferCount; i++) { - buffers[i] = allocator.allocate(this.bufferCapacity); + buffers[i] = allocator.allocate((this.bufferIndexFrom + i), this.bufferCapacity); } return buffers; } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java index 3fc1c230f5..530aab2bd8 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestByteBufferArray.java @@ -43,7 +43,7 @@ public class TestByteBufferArray { int capacity = 4 * 1024 * 1024; ByteBufferAllocator allocator = new ByteBufferAllocator() { @Override - public ByteBuffer allocate(long size) throws IOException { + public ByteBuffer allocate(int index, long size) throws IOException { return ByteBuffer.allocateDirect((int) size); } }; @@ -61,7 +61,7 @@ public class TestByteBufferArray { int capacity = 470 * 1021 * 1023; ByteBufferAllocator allocator = new ByteBufferAllocator() { @Override - public ByteBuffer allocate(long size) throws IOException { + public ByteBuffer allocate(int index, long size) throws IOException { return ByteBuffer.allocateDirect((int) size); } }; @@ -80,7 +80,7 @@ public class TestByteBufferArray { public void testByteBufferCreation1() throws Exception { ByteBufferAllocator allocator = new ByteBufferAllocator() { @Override - public ByteBuffer allocate(long size) throws IOException { + public ByteBuffer allocate(int index, long size) throws IOException { return ByteBuffer.allocateDirect((int) size); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 3b832fe397..5a60624269 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -79,7 +79,7 @@ public class ByteBufferIOEngine implements IOEngine { this.capacity = capacity; ByteBufferAllocator allocator = new ByteBufferAllocator() { @Override - public ByteBuffer allocate(long size) throws IOException { + public ByteBuffer allocate(int index, long size) throws IOException { return ByteBuffer.allocateDirect((int) size); } }; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java index 82f42cda2a..4f064c0d00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -70,11 +69,10 @@ public class FileMmapEngine implements IOEngine { throw ioex; } ByteBufferAllocator allocator = new ByteBufferAllocator() { - AtomicInteger pos = new AtomicInteger(0); @Override - public ByteBuffer allocate(long size) throws IOException { + public ByteBuffer allocate(int index, long size) throws IOException { ByteBuffer buffer = fileChannel.map(java.nio.channels.FileChannel.MapMode.READ_WRITE, - pos.getAndIncrement() * size, size); + (index * size), size); return buffer; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index f4e4e53aa2..90497ed5cd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -244,11 +244,16 @@ public class TestBucketCache { @Test public void testRetrieveFromFile() throws Exception { + testRetrieveFromFile("file:"); + testRetrieveFromFile("mmap:"); + } + + private void testRetrieveFromFile(String ioEnginePrefix) throws Exception { HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); Path testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); - String ioEngineName = "file:" + testDir + "/bucket.cache"; + String ioEngineName = ioEnginePrefix + testDir + "/bucket.cache"; String persistencePath = testDir + "/bucket.persistence"; BucketCache bucketCache = new BucketCache(ioEngineName, capacitySize, constructedBlockSize, @@ -275,6 +280,10 @@ public class TestBucketCache { constructedBlockSizes, writeThreads, writerQLen, persistencePath); assertFalse(new File(persistencePath).exists()); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); + for (HFileBlockPair pair : blocks) { + Cacheable blockRead = bucketCache.getBlock(pair.getBlockName(), false, false, false); + assertTrue(pair.getBlock().equals(blockRead)); + } // persist cache to file bucketCache.shutdown(); assertTrue(new File(persistencePath).exists());