diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java index 2e14b13a2b..3f8add74ee 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ByteBufferArray.java @@ -53,6 +53,10 @@ public class ByteBufferArray { @VisibleForTesting int bufferCount; + public ByteBufferArray(long capacity, ByteBufferAllocator allocator) throws IOException { + this(capacity, DEFAULT_BUFFER_SIZE, allocator); + } + /** * We allocate a number of byte buffers as the capacity. In order not to out * of the array bounds for the last byte(see {@link ByteBufferArray#multiple}), @@ -61,14 +65,14 @@ public class ByteBufferArray { * @param allocator the ByteBufferAllocator that will create the buffers * @throws IOException throws IOException if there is an exception thrown by the allocator */ - public ByteBufferArray(long capacity, ByteBufferAllocator allocator) + public ByteBufferArray(long capacity, int bufferSize, ByteBufferAllocator allocator) throws IOException { - this.bufferSize = DEFAULT_BUFFER_SIZE; + this.bufferSize = bufferSize; if (this.bufferSize > (capacity / 16)) this.bufferSize = (int) roundUp(capacity / 16, 32768); - this.bufferCount = (int) (roundUp(capacity, bufferSize) / bufferSize); + this.bufferCount = (int) (roundUp(capacity, this.bufferSize) / this.bufferSize); LOG.info("Allocating buffers total=" + StringUtils.byteDesc(capacity) - + ", sizePerBuffer=" + StringUtils.byteDesc(bufferSize) + ", count=" + + ", sizePerBuffer=" + StringUtils.byteDesc(this.bufferSize) + ", count=" + bufferCount); buffers = new ByteBuffer[bufferCount + 1]; createBuffers(allocator); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index 333b7ef799..9fc192c8f7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializerIdManager; import org.apache.hadoop.hbase.io.hfile.CachedBlock; import org.apache.hadoop.hbase.io.hfile.HFileBlock; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.util.ByteBufferArray; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.IdReadWriteLock; @@ -111,6 +112,7 @@ public class BucketCache implements BlockCache, HeapSize { static final String EXTRA_FREE_FACTOR_CONFIG_NAME = "hbase.bucketcache.extrafreefactor"; static final String ACCEPT_FACTOR_CONFIG_NAME = "hbase.bucketcache.acceptfactor"; static final String MIN_FACTOR_CONFIG_NAME = "hbase.bucketcache.minfactor"; + static final String BUFFER_SIZE = "hbase.bucketcache.buffer.size"; /** Priority buckets */ @VisibleForTesting @@ -259,7 +261,8 @@ public class BucketCache implements BlockCache, HeapSize { int writerThreadNum, int writerQLen, String persistencePath, int ioErrorsTolerationDuration, Configuration conf) throws FileNotFoundException, IOException { - this.ioEngine = getIOEngineFromName(ioEngineName, capacity, persistencePath); + int bufferSize = conf.getInt(BUFFER_SIZE, ByteBufferArray.DEFAULT_BUFFER_SIZE); + this.ioEngine = getIOEngineFromName(ioEngineName, capacity, bufferSize, persistencePath); this.writerThreads = new WriterThread[writerThreadNum]; long blockNumCapacity = capacity / blockSize; if (blockNumCapacity >= Integer.MAX_VALUE) { @@ -371,8 +374,8 @@ public class BucketCache implements BlockCache, HeapSize { * @return the IOEngine * @throws IOException */ - private IOEngine getIOEngineFromName(String ioEngineName, long capacity, String persistencePath) - throws IOException { + private IOEngine getIOEngineFromName(String ioEngineName, long capacity, int bufferSize, + String persistencePath) throws IOException { if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) { // In order to make the usage simple, we only need the prefix 'files:' in // document whether one or multiple file(s), but also support 'file:' for @@ -381,9 +384,11 @@ public class BucketCache implements BlockCache, HeapSize { .split(FileIOEngine.FILE_DELIMITER); return new FileIOEngine(capacity, persistencePath != null, filePaths); } else if (ioEngineName.startsWith("offheap")) { - return new ByteBufferIOEngine(capacity); + return new ByteBufferIOEngine(capacity, bufferSize); } else if (ioEngineName.startsWith("mmap:")) { - return new FileMmapEngine(ioEngineName.substring(5), capacity); + return new FileMmapEngine(ioEngineName.substring(5), capacity, bufferSize); + } else if (ioEngineName.startsWith("pmem:")) { + return new PmemIOEngine(ioEngineName.substring(5), capacity, bufferSize); } else { throw new IllegalArgumentException( "Don't understand io engine name for cache- prefix with file:, files:, mmap: or offheap"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 3b832fe397..835d45cacf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -74,7 +74,7 @@ public class ByteBufferIOEngine implements IOEngine { * @param capacity * @throws IOException ideally here no exception to be thrown from the allocator */ - public ByteBufferIOEngine(long capacity) + public ByteBufferIOEngine(long capacity, int bufferSize) throws IOException { this.capacity = capacity; ByteBufferAllocator allocator = new ByteBufferAllocator() { @@ -83,7 +83,7 @@ public class ByteBufferIOEngine implements IOEngine { return ByteBuffer.allocateDirect((int) size); } }; - bufferArray = new ByteBufferArray(capacity, allocator); + bufferArray = new ByteBufferArray(capacity, bufferSize, allocator); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java index 82f42cda2a..3f254b897a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java @@ -44,13 +44,13 @@ import org.apache.hadoop.util.StringUtils; public class FileMmapEngine implements IOEngine { static final Logger LOG = LoggerFactory.getLogger(FileMmapEngine.class); - private final String path; - private long size; - private ByteBufferArray bufferArray; + protected final String path; + protected long size; + protected ByteBufferArray bufferArray; private final FileChannel fileChannel; private RandomAccessFile raf = null; - public FileMmapEngine(String filePath, long capacity) throws IOException { + public FileMmapEngine(String filePath, long capacity, int bufferSize) throws IOException { this.path = filePath; this.size = capacity; long fileSize = 0; @@ -78,7 +78,7 @@ public class FileMmapEngine implements IOEngine { return buffer; } }; - bufferArray = new ByteBufferArray(fileSize, allocator); + bufferArray = new ByteBufferArray(fileSize, bufferSize, allocator); } private long roundUp(long n, long to) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PmemIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PmemIOEngine.java new file mode 100644 index 0000000000..e2b1a41d28 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/PmemIOEngine.java @@ -0,0 +1,64 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hbase.io.hfile.bucket; + +import java.io.IOException; + +import org.apache.hadoop.hbase.io.hfile.Cacheable; +import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; +import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; +import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * IO engine that stores data in pmem devices such as ApachePass. + */ +@InterfaceAudience.Private +public class PmemIOEngine extends FileMmapEngine { + + // TODO this will support only one path over Pmem. To make use of multiple Pmem devices mounted, + // we need to support multiple paths like files IOEngine. Support later. + public PmemIOEngine(String filePath, long capacity, int bufferSize) throws IOException { + super(filePath, capacity, bufferSize); + } + + @Override + public Cacheable read(long offset, int length, CacheableDeserializer deserializer) + throws IOException { + ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); + // Here the buffer that is created directly refers to the buffer in the actual buckets. + // When any cell is referring to the blocks created out of these buckets then it means that + // those cells are referring to a shared memory area which if evicted by the BucketCache would + // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY + // so that the readers using this block are aware of this fact and do the necessary action + // to prevent eviction till the results are either consumed or copied + return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED); + } + + @Override + public boolean usesSharedMemory() { + return true; + } + + @Override + public String toString() { + return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + ", size=" + + String.format("%,d", this.size); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index bb58b4e9d3..5f4d0a2042 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferArray; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -47,7 +48,8 @@ public class TestByteBufferIOEngine { int capacity = 32 * 1024 * 1024; // 32 MB int testNum = 100; int maxBlockSize = 64 * 1024; - ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, + ByteBufferArray.DEFAULT_BUFFER_SIZE); int testOffsetAtStartNum = testNum / 10; int testOffsetAtEndNum = testNum / 10; for (int i = 0; i < testNum; i++) { @@ -117,7 +119,8 @@ public class TestByteBufferIOEngine { int capacity = 32 * 1024 * 1024; // 32 MB int testNum = 100; int maxBlockSize = 64 * 1024; - ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity); + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(capacity, + ByteBufferArray.DEFAULT_BUFFER_SIZE); int testOffsetAtStartNum = testNum / 10; int testOffsetAtEndNum = testNum / 10; for (int i = 0; i < testNum; i++) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java index 2748d80bdb..895485c1a7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrab import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ByteBufferArray; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -46,7 +47,8 @@ public class TestFileMmapEngine { int size = 2 * 1024 * 1024; // 2 MB String filePath = "testFileMmapEngine"; try { - FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size); + FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, + ByteBufferArray.DEFAULT_BUFFER_SIZE, size); for (int i = 0; i < 50; i++) { int len = (int) Math.floor(Math.random() * 100); long offset = (long) Math.floor(Math.random() * size % (size - len));