From 6cf632cf531422105c1f17336030f7eaa9d576e0 Mon Sep 17 00:00:00 2001 From: Aaron Tokhy Date: Wed, 30 Nov 2016 17:14:45 -0800 Subject: [PATCH] HBASE-15314 Allow more than one backing file in bucketcache --- .../hadoop/hbase/io/hfile/bucket/BucketCache.java | 24 ++ .../hbase/io/hfile/bucket/ByteBufferIOEngine.java | 21 ++ .../hadoop/hbase/io/hfile/bucket/FileIOEngine.java | 287 +++++++++++++++---- .../hbase/io/hfile/bucket/FileMmapEngine.java | 10 + .../hadoop/hbase/io/hfile/bucket/IOEngine.java | 12 + .../hadoop/hbase/io/hfile/CacheTestUtils.java | 1 + .../hbase/io/hfile/bucket/TestBucketCache.java | 86 ++++-- .../io/hfile/bucket/TestByteBufferIOEngine.java | 14 + .../hbase/io/hfile/bucket/TestFileIOEngine.java | 315 +++++++++++++++++++-- .../hbase/io/hfile/bucket/TestFileMmapEngine.java | 61 ++-- 10 files changed, 712 insertions(+), 119 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index a36423e..6074e97 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -74,6 +74,8 @@ import org.apache.hadoop.hbase.util.IdReadWriteLock; import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableList; +import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** @@ -310,6 +312,8 @@ public class BucketCache implements BlockCache, HeapSize { throws IOException { if (ioEngineName.startsWith("file:")) { return new FileIOEngine(ioEngineName.substring(5), capacity); + } else if (ioEngineName.startsWith("files:")) { + return new FileIOEngine(ioEngineName.substring(6).split(","), capacity); } else if (ioEngineName.startsWith("offheap")) { return new ByteBufferIOEngine(capacity, true); } else if (ioEngineName.startsWith("heap")) { @@ -1319,7 +1323,20 @@ public class BucketCache implements BlockCache, HeapSize { int len = data.getSerializedLength(); // This cacheable thing can't be serialized if (len == 0) return null; + long[] failedSegmentOffsets = null; long offset = bucketAllocator.allocateBlock(len); + int failedSegmentCount = 0; + if (ioEngine.isSegmented()) { + failedSegmentOffsets = new long[1]; + while (ioEngine.allocationCrossedSegments(offset, len)) { + if (failedSegmentCount == failedSegmentOffsets.length) { + failedSegmentOffsets = Longs.ensureCapacity(failedSegmentOffsets, + failedSegmentOffsets.length * 2, 0); + } + failedSegmentOffsets[failedSegmentCount++] = offset; + offset = bucketAllocator.allocateBlock(len); + } + } BucketEntry bucketEntry = new BucketEntry(offset, len, accessCounter, inMemory); bucketEntry.setDeserialiserReference(data.getDeserializer(), deserialiserMap); try { @@ -1339,8 +1356,15 @@ public class BucketCache implements BlockCache, HeapSize { ioEngine.write(bb, offset); } } catch (IOException ioe) { + // CacheFullException is an IOException // free it in bucket allocator bucketAllocator.freeBlock(offset); + if (ioEngine.isSegmented()) { + // free any failed allocations due to segmentation + for (int i = 0; i < failedSegmentCount; ++i) { + bucketAllocator.freeBlock(failedSegmentOffsets[i]); + } + } throw ioe; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 63de32c..51119d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -110,6 +110,27 @@ public class ByteBufferIOEngine implements IOEngine { return false; } + /** + * The Memory IO engine is not a segmented buffer + * @return false + */ + @Override + public boolean isSegmented() { + return false; + } + + /** + * The Memory IO engine is not segmented, so an allocation can never cross a segment + * + * @param offset the offset of the allocation + * @param len the length of the allocation + * @return false + */ + @Override + public boolean allocationCrossedSegments(long offset, long len) { + return false; + } + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index aaf5cf9..ecb71a3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -18,11 +18,21 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Joiner; +import com.google.common.math.LongMath; + +import java.io.Closeable; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.math.RoundingMode; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -34,47 +44,137 @@ import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.util.StringUtils; /** - * IO engine that stores data to a file on the local file system. + * IO engine that stores data as files to one or more local filesystems */ @InterfaceAudience.Private public class FileIOEngine implements IOEngine { private static final Log LOG = LogFactory.getLog(FileIOEngine.class); - private final RandomAccessFile raf; - private final FileChannel fileChannel; - private final String path; - private long size; - public FileIOEngine(String filePath, long fileSize) throws IOException { - this.path = filePath; - this.size = fileSize; - try { - raf = new RandomAccessFile(filePath, "rw"); - } catch (java.io.FileNotFoundException fex) { - LOG.error("Can't create bucket cache file " + filePath, fex); - throw fex; + private final List segments; + private final long fileSize; + private final long totalSize; + + class SegmentFile implements Closeable { + final RandomAccessFile raf; + final FileChannel channel; + final String path; + final long endOffset; + + SegmentFile(String path, long endOffset) throws IOException { + this.path = path; + this.endOffset = endOffset; + try { + this.raf = new RandomAccessFile(path, "rw"); + } catch (FileNotFoundException fex) { + LOG.error("Cannot create bucket cache file " + path, fex); + throw fex; + } + try { + LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", to the file: " + path); + this.raf.setLength(fileSize); + } catch (IOException ex) { + LOG.error("Cannot extend bucket cache file " + path + "; insufficient space for " + + StringUtils.byteDesc(fileSize), ex); + closeRandomAccessFile(); + throw ex; + } + this.channel = this.raf.getChannel(); + } + + @Override + public void close() throws IOException { + IOException lastException = null; + try { + closeChannel(); + } catch (IOException ex) { + LOG.error("Failed to close the FileChannel for " + path, ex); + lastException = ex; + } + try { + closeRandomAccessFile(); + } catch (IOException ex) { + LOG.error("Failed to close the RandomAccessFile for " + path, ex); + lastException = ex; + } + if (lastException != null) { + throw lastException; + } + } + + @Override + public String toString() { + return "path=" + path + ", endOffset=" + endOffset; + } + + void closeChannel() throws IOException { + getChannel().close(); + } + + void closeRandomAccessFile() throws IOException { + getRandomAccessFile().close(); + } + + void fsync() throws IOException { + getChannel().force(true); + } + + FileChannel getChannel() { + return channel; } - try { - raf.setLength(fileSize); - } catch (IOException ioex) { - LOG.error("Can't extend bucket cache file; insufficient space for " - + StringUtils.byteDesc(fileSize), ioex); - raf.close(); - throw ioex; + RandomAccessFile getRandomAccessFile() { + return raf; } + } - fileChannel = raf.getChannel(); - LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath); + public FileIOEngine(String filePath, long totalSize) throws IOException { + this(new String[] {filePath}, totalSize); } + /** + * Initializes a FileIOEngine which can be used by the HBase bucket cache + * + * If the 'totalSize' does not evenly divide into the the number of files given, this will + * allocate extra (unused) space, this is done to ensure the segments are of equal size + * + * @param filePaths paths to the files this FileIOEngine will read and write to + * @param totalSize the total aggregate size for all of the files + * @throws IOException if initializing the IOEngine fails for any reason. + */ + public FileIOEngine(String[] filePaths, long totalSize) throws IOException { + int numFiles = filePaths.length; + this.segments = new ArrayList<>(numFiles); + + this.fileSize = LongMath.divide(totalSize, numFiles, RoundingMode.CEILING); + this.totalSize = fileSize * numFiles; + if (this.totalSize > totalSize) { + LOG.warn("Initializing with a total size of " + this.totalSize + " when " + totalSize + + " was requested."); + } + + for (int i = 0; i < numFiles; ++i) { + long endOffset = this.fileSize * (i + 1); + String path = filePaths[i].trim(); + try { + SegmentFile segmentFile = new SegmentFile(path, endOffset); + this.segments.add(i, segmentFile); + } catch (IOException ex) { + LOG.error("Failed to initialize a file for this IOEngine " + path + ". Shutting down", ex); + shutdown(); + throw ex; + } + } + } + @Override public String toString() { - return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path + - ", size=" + String.format("%,d", this.size); + return String.format("ioengine=%s, segments=[%s], size=%,d", getClass().getSimpleName(), + Joiner.on(", ").join(getSegments()), totalSize); } /** * File IO engine is always able to support persistent storage for the cache + * * @return true */ @Override @@ -83,71 +183,150 @@ public class FileIOEngine implements IOEngine { } /** - * Transfers data from file to the given byte buffer - * @param offset The offset in the file where the first byte to be read - * @param length The length of buffer that should be allocated for reading - * from the file channel - * @return number of bytes read - * @throws IOException + * @return true if more than 1 file is used, false otherwise */ @Override + public boolean isSegmented() { + return getSegments().size() > 1; + } + + /** + * Determines whether or not a successful allocation crossed a segment. + * + * @param offset the offset of the allocation + * @param len the length of the allocation. + * @return true if the allocation crosses a segment boundary, or crosses beyond the totalSize + * @throws IllegalArgumentException if offset or the offset at length is less than 0 or greater + * than or equal to the total size. + */ + @Override + public boolean allocationCrossedSegments(long offset, long len) { + long offsetAtLength = offset + len; + checkOffset(offset); + SegmentFile segment = getSegment(offset); + return segment.endOffset < offsetAtLength; + } + + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { ByteBuffer dstBuffer = ByteBuffer.allocate(length); - fileChannel.read(dstBuffer, offset); + // The buffer created out of the fileChannel is formed by copying the data from the file // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts // this buffer from the file the data is already copied and there is no need to ensure that // the results are not corrupted before consuming them. if (dstBuffer.limit() != length) { - throw new RuntimeException("Only " + dstBuffer.limit() + " bytes read, " + length - + " expected"); + throw new RuntimeException(String.format("Only %d bytes read, %d expected", + dstBuffer.limit(), length)); + } + if (allocationCrossedSegments(offset, dstBuffer.remaining())) { + throw new IllegalArgumentException(String.format("Read operation, with offset %d and " + + "length %d, is crossing a segment", offset, dstBuffer.remaining())); } + getSegment(offset).getChannel().read(dstBuffer, getFileOffset(offset)); return deserializer.deserialize(new SingleByteBuff(dstBuffer), true, MemoryType.EXCLUSIVE); } /** * Transfers data from the given byte buffer to file + * + * @param srcByteBuff the given byte buffer from which bytes are to be read + * @param offset The offset in the total size of IOEngine where the first byte to be written + * @throws IOException if the write fails + * @throws IllegalArgumentException if offset or the offset at length is less than 0 or greater + * than or equal to the total size, or if the destination buffer is large enough to accommodate + * for a read which reads beyond a segment. + */ + @Override + public void write(ByteBuff srcByteBuff, long offset) throws IOException { + // When caching block into BucketCache there will be single buffer backing for this HFileBlock. + assert srcByteBuff.hasArray(); + ByteBuffer srcByteBuffer = ByteBuffer.wrap(srcByteBuff.array(), srcByteBuff.arrayOffset(), + srcByteBuff.remaining()); + write(srcByteBuffer, offset); + } + + /** + * Transfers data from the given byte buffer to file + * * @param srcBuffer the given byte buffer from which bytes are to be read - * @param offset The offset in the file where the first byte to be written - * @throws IOException + * @param offset The offset in the total size of IOEngine where the first byte to be written + * @throws IOException if the write fails + * @throws IllegalArgumentException if offset or the offset at length is less than 0 or greater + * than or equal to the total size, or if the destination buffer is large enough to accommodate + * for a read which reads beyond a segment. */ @Override public void write(ByteBuffer srcBuffer, long offset) throws IOException { - fileChannel.write(srcBuffer, offset); + assert srcBuffer.hasArray(); + if (allocationCrossedSegments(offset, srcBuffer.remaining())) { + throw new IllegalArgumentException(String.format("Write operation, with offset %d and " + + "length %d, is crossing a segment", offset, srcBuffer.remaining())); + } + getSegment(offset).getChannel().write(srcBuffer, getFileOffset(offset)); } /** * Sync the data to file after writing - * @throws IOException + * + * @throws IOException if one segment fails to sync. */ @Override public void sync() throws IOException { - fileChannel.force(true); + for (SegmentFile segment : getSegments()) { + segment.fsync(); + } } /** - * Close the file + * Close the files */ @Override public void shutdown() { - try { - fileChannel.close(); - } catch (IOException ex) { - LOG.error("Can't shutdown cleanly", ex); - } - try { - raf.close(); - } catch (IOException ex) { - LOG.error("Can't shutdown cleanly", ex); + for (SegmentFile segment : getSegments()) { + IOUtils.closeQuietly(segment); } } - @Override - public void write(ByteBuff srcBuffer, long offset) throws IOException { - // When caching block into BucketCache there will be single buffer backing for this HFileBlock. - assert srcBuffer.hasArray(); - fileChannel.write( - ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset); + @VisibleForTesting + long getTotalSize() { + return totalSize; + } + + @VisibleForTesting + List getSegments() { + return segments; + } + + /** + * Obtain the segment corresponding to the given offset. + * + * @param offset the offset + * @return The segment containing the offset + */ + private SegmentFile getSegment(long offset) { + int segmentIdx = (int) (offset / fileSize); + return getSegments().get(segmentIdx); } + + /** + * Obtain the file offset within a segment + * + * @param offset the offset + * @return the real offset within the segment + */ + private long getFileOffset(long offset) { + return offset % fileSize; + } + + private void checkOffset(long offset) { + if (offset < 0L) { + throw new IllegalArgumentException("offset is negative"); + } else if (offset >= totalSize) { + throw new IllegalArgumentException(String.format("offset %d must not be greater than or " + + "equal to size %d", offset, totalSize)); + } + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java index 7a2afe8..38513a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java @@ -107,6 +107,16 @@ public class FileMmapEngine implements IOEngine { } @Override + public boolean isSegmented() { + return false; + } + + @Override + public boolean allocationCrossedSegments(long offset, long len) { + return false; + } + + @Override public Cacheable read(long offset, int length, CacheableDeserializer deserializer) throws IOException { byte[] dst = new byte[length]; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index ab673f4..e4de7b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -38,6 +38,18 @@ public interface IOEngine { boolean isPersistent(); /** + * @return true if the IOEngine is segmented at specific boundaries + */ + boolean isSegmented(); + + /** + * @param offset the offset of the allocation + * @param len the length of the allocation + * @return true if the allocation would cross a segment boundary, false otherwise + */ + boolean allocationCrossedSegments(long offset, long len); + + /** * Transfers data from IOEngine to a Cacheable object. * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java index bd3f4c7..40ef6ca 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/CacheTestUtils.java @@ -275,6 +275,7 @@ public class CacheTestUtils { @Override public Cacheable deserialize(ByteBuff b) throws IOException { + b.rewind(); int len = b.getInt(); Thread.yield(); byte buf[] = new byte[len]; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java index 6fe352d..d4559d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestBucketCache.java @@ -40,16 +40,18 @@ import org.apache.hadoop.hbase.io.hfile.bucket.BucketAllocator.IndexStatistics; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.junit.After; +import org.junit.AfterClass; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; /** * Basic test of BucketCache.Puts and gets. - *

- * Tests will ensure that blocks' data correctness under several threads concurrency + *

Tests will ensure that blocks' data correctness under several threads concurrency across + * different IOEngine implementations

*/ @RunWith(Parameterized.class) @Category({ IOTests.class, SmallTests.class }) @@ -57,22 +59,35 @@ public class TestBucketCache { private static final Random RAND = new Random(); - @Parameterized.Parameters(name = "{index}: blockSize={0}, bucketSizes={1}") + public static TemporaryFolder FOLDER = initStaticTemp(); + + @Parameterized.Parameters(name = "{index}: ioengine={0}, blockSize={1}, bucketSizes={2}") public static Iterable data() { return Arrays.asList(new Object[][] { - { 8192, null }, // TODO: why is 8k the default blocksize for these tests? + { "heap", 8192, null }, + { "offheap", 8192, null }, + { "file", 8192, null }, + { "files", 8192, null }, // TODO: why is 8k the default blocksize for these tests? + { + "offheap", + 16 * 1024, + new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, + 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, + 128 * 1024 + 1024 }, + }, { + "files", 16 * 1024, new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024, 8 * 1024 + 1024, 16 * 1024 + 1024, 28 * 1024 + 1024, 32 * 1024 + 1024, 64 * 1024 + 1024, 96 * 1024 + 1024, - 128 * 1024 + 1024 } } }); + 128 * 1024 + 1024 }, + }, + }); } - @Parameterized.Parameter(0) - public int constructedBlockSize; - - @Parameterized.Parameter(1) - public int[] constructedBlockSizes; + private final String ioEngineName; + private final int constructedBlockSize; + private final int[] constructedBlockSizes; BucketCache cache; final int CACHE_SIZE = 1000000; @@ -80,13 +95,34 @@ public class TestBucketCache { final int BLOCK_SIZE = CACHE_SIZE / NUM_BLOCKS; final int NUM_THREADS = 100; final int NUM_QUERIES = 10000; + final int FILE_IO_ENGINE_NUM_FILES = 16; + final long CAPACITY_SIZE = 32 * 1024 * 1024; - final long capacitySize = 32 * 1024 * 1024; final int writeThreads = BucketCache.DEFAULT_WRITER_THREADS; final int writerQLen = BucketCache.DEFAULT_WRITER_QUEUE_ITEMS; - String ioEngineName = "heap"; String persistencePath = null; + public TestBucketCache(String ioEngineName, + int constructedBlockSize, + int[] constructedBlockSizes) throws Exception { + this.constructedBlockSize = constructedBlockSize; + this.constructedBlockSizes = constructedBlockSizes; + if ("files".equals(ioEngineName)) { + StringBuilder sb = new StringBuilder("files:"); + String delim = ""; + for (int i = 0; i < FILE_IO_ENGINE_NUM_FILES; ++i) { + sb.append(delim); + sb.append(FOLDER.newFile()); + delim = ","; + } + this.ioEngineName = sb.toString(); + } else if ("file".equals(ioEngineName)) { + this.ioEngineName = "file:/" + FOLDER.newFile(); + } else { + this.ioEngineName = ioEngineName; + } + } + private class MockedBucketCache extends BucketCache { public MockedBucketCache(String ioEngineName, long capacity, int blockSize, int[] bucketSizes, @@ -115,10 +151,23 @@ public class TestBucketCache { } } + protected static TemporaryFolder initStaticTemp() { + try { + return new TemporaryFolder() { + { + before(); + } + }; + } catch (Throwable t) { + throw new RuntimeException(t); + } + } + + @Before public void setup() throws FileNotFoundException, IOException { cache = - new MockedBucketCache(ioEngineName, capacitySize, constructedBlockSize, + new MockedBucketCache(ioEngineName, CAPACITY_SIZE, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, persistencePath); } @@ -127,6 +176,11 @@ public class TestBucketCache { cache.shutdown(); } + @AfterClass + public static void cleanup() throws Exception { + FOLDER.delete(); + } + /** * Return a random element from {@code a}. */ @@ -230,7 +284,7 @@ public class TestBucketCache { Path testDir = TEST_UTIL.getDataTestDir(); TEST_UTIL.getTestFileSystem().mkdirs(testDir); - BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + BucketCache bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", CAPACITY_SIZE, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); long usedSize = bucketCache.getAllocator().getUsedSize(); @@ -250,7 +304,7 @@ public class TestBucketCache { bucketCache.shutdown(); // restore cache from file - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", CAPACITY_SIZE, constructedBlockSize, constructedBlockSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); assertEquals(usedSize, bucketCache.getAllocator().getUsedSize()); @@ -260,7 +314,7 @@ public class TestBucketCache { // reconfig buckets sizes, the biggest bucket is small than constructedBlockSize (8k or 16k) // so it can't restore cache from file int[] smallBucketSizes = new int[] { 2 * 1024 + 1024, 4 * 1024 + 1024 }; - bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", capacitySize, + bucketCache = new BucketCache("file:" + testDir + "/bucket.cache", CAPACITY_SIZE, constructedBlockSize, smallBucketSizes, writeThreads, writerQLen, testDir + "/bucket.persistence"); assertEquals(0, bucketCache.getAllocator().getUsedSize()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index bf15a59..f17563e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -148,4 +149,17 @@ public class TestByteBufferIOEngine { assert testOffsetAtStartNum == 0; assert testOffsetAtEndNum == 0; } + + @Test + public void testIsSegmented() throws Exception { + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(100, false); + assertFalse(ioEngine.isSegmented()); + } + + @Test + public void testDoesAllocationCrossSegments() throws Exception { + ByteBufferIOEngine ioEngine = new ByteBufferIOEngine(100, false); + assertFalse(ioEngine.allocationCrossedSegments(0, 100)); + } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 93f4cf7..572f8d3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -18,51 +18,312 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; -import static org.junit.Assert.assertTrue; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; +import com.google.common.math.LongMath; +import com.google.common.primitives.Ints; import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer; import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import java.io.File; +import java.io.RandomAccessFile; +import java.math.RoundingMode; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Basic test for {@link FileIOEngine} */ @Category({IOTests.class, SmallTests.class}) +@RunWith(Parameterized.class) public class TestFileIOEngine { + private static final Object[] ONE_FILE = parametersFor(1, 4096); + private static final Object[] TWO_FILES = parametersFor(2, 4096); + private static final Object[] EIGHT_FILES = parametersFor(8, 4096); + private static final Object[] MANY_FILES = parametersFor(24, 24 * 4096); + private static final Object[] EVENLY_DIVIDE = parametersFor(4, 4096); + private static final Object[] EVENLY_DIVIDE_LOW_STEP = parametersFor(4, 4095); + private static final Object[] EVENLY_DIVIDE_HIGH_STEP = parametersFor(4, 4093); + private static final Object[] EVENLY_DIVIDE_MINUS_1 = parametersFor(4, 4092); + @Rule + public TestName name = new TestName(); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + @Rule + public ExpectedException thrown = ExpectedException.none(); + + // Parameterized on + private final int fileCount; + private final long totalSize; + + private String[] filePaths; + private File[] files; + private String[] shutdownTestPaths; + private File[] shutdownTestFiles; + private FileIOEngine fileIOEngine; + private int fileSize; + private long expectedTotalSize; + + private Random random; + + public TestFileIOEngine(int fileCount, long totalSize) { + this.fileCount = fileCount; + this.totalSize = totalSize; + } + + @Parameters(name = "{index}: fileCount={0}, totalSize={1}") + public static Collection data() { + return Arrays.asList( + ONE_FILE, + TWO_FILES, + EIGHT_FILES, + MANY_FILES, + EVENLY_DIVIDE, + EVENLY_DIVIDE_LOW_STEP, + EVENLY_DIVIDE_HIGH_STEP, + EVENLY_DIVIDE_MINUS_1 + ); + } + + private static Object[] parametersFor(int fileCount, long totalSize) { + return new Object[]{fileCount, totalSize}; + } + + @Before + public void setUp() throws Exception { + files = new File[fileCount]; + filePaths = new String[fileCount]; + shutdownTestFiles = new File[fileCount]; + shutdownTestPaths = new String[fileCount]; + for (int i = 0; i < fileCount; ++i) { + files[i] = folder.newFile(String.format("%s_FileCount%d_TotalSize%d_Index%d", + name.getMethodName(), fileCount, totalSize, i)); + filePaths[i] = files[i].getAbsolutePath(); + shutdownTestFiles[i] = folder.newFile( + String.format("ForShutdown_%s_FileCount%d_TotalSize%d_Index%d", + name.getMethodName(), fileCount, totalSize, i)); + shutdownTestPaths[i] = shutdownTestFiles[i].getAbsolutePath(); + } + fileSize = Ints.checkedCast(LongMath.divide(totalSize, fileCount, RoundingMode.CEILING)); + expectedTotalSize = fileSize * fileCount; + fileIOEngine = new FileIOEngine(filePaths, totalSize); + random = new Random(); + } + + @After + public void tearDown() throws Exception { + fileIOEngine.shutdown(); + } + + @Test + public void testFileIOEngineConstructor() throws Exception { + assertEquals(expectedTotalSize, fileIOEngine.getTotalSize()); + for (File file : files) { + assertEquals(fileSize, file.length()); + } + } + + @Test + public void testIsPersistent() { + assertTrue(fileIOEngine.isPersistent()); + } + + @Test + public void testIsSegmented() throws Exception { + if (fileCount > 1) { + assertTrue(fileIOEngine.isSegmented()); + } else { + assertFalse(fileIOEngine.isSegmented()); + } + } + + public void testAllocationCrossedSegments() throws Exception { + for (int i = 0; i < fileCount; ++i) { + long offset = i * fileSize; + long endOffset = (i + 1) * fileSize; + assertFalse(fileIOEngine.allocationCrossedSegments(offset, fileSize)); + assertFalse(fileIOEngine.allocationCrossedSegments(endOffset - 1, 1)); + assertTrue(fileIOEngine.allocationCrossedSegments(offset, fileSize + 1)); + assertTrue(fileIOEngine.allocationCrossedSegments(endOffset - 1, 2)); + } + } + + @Test + public void testRead() throws Exception { + assertReadWithOffsetAndLength(0, fileSize); + assertReadWithOffsetAndLength(0, fileSize / 2); + assertReadWithOffsetAndLength(fileSize - 1, 1); + assertReadWithOffsetAndLength(fileSize / 2, fileSize - (fileSize / 2)); + } + + @Test + public void testWrite() throws Exception { + assertWriteWithOffsetAndLength(0, fileSize); + assertWriteWithOffsetAndLength(0, fileSize / 2); + assertWriteWithOffsetAndLength(fileSize - 1, 1); + assertWriteWithOffsetAndLength(fileSize / 2, fileSize - (fileSize / 2)); + } + @Test - public void testFileIOEngine() throws IOException { - int size = 2 * 1024 * 1024; // 2 MB - String filePath = "testFileIOEngine"; - try { - FileIOEngine fileIOEngine = new FileIOEngine(filePath, size); - for (int i = 0; i < 50; i++) { - int len = (int) Math.floor(Math.random() * 100); - long offset = (long) Math.floor(Math.random() * size % (size - len)); - byte[] data1 = new byte[len]; - for (int j = 0; j < data1.length; ++j) { - data1[j] = (byte) (Math.random() * 255); - } - fileIOEngine.write(ByteBuffer.wrap(data1), offset); + public void testReadCrossSegment() throws Exception { + assertExceptionOnReadWhenCrossingSegments(0, fileSize + 1); + assertExceptionOnReadWhenCrossingSegments(fileSize - 1, 2); + assertExceptionOnReadWhenCrossingSegments(fileSize / 2, fileSize - (fileSize / 2) + 1); + } + + @Test + public void testWriteCrossSegment() throws Exception { + assertExceptionOnWriteWhenCrossingSegments(0, fileSize + 1); + assertExceptionOnWriteWhenCrossingSegments(fileSize - 1, 2); + assertExceptionOnWriteWhenCrossingSegments(fileSize / 2, fileSize - (fileSize / 2) + 1); + } + + @Test + public void testReadNegativeOffset() throws Exception { + thrown.expect(IllegalArgumentException.class); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileIOEngine.read(-1, 1, deserializer); + } + + @Test + public void testWriteNegativeOffset() throws Exception { + thrown.expect(IllegalArgumentException.class); + fileIOEngine.write(ByteBuffer.allocate(1), -1); + } + + @Test + public void testReadOutOfBoundsOffset() throws Exception { + thrown.expect(IllegalArgumentException.class); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileIOEngine.read(1, (int) expectedTotalSize, deserializer); + } + + @Test + public void testWriteOutOfBoundsOffset() throws Exception { + thrown.expect(IllegalArgumentException.class); + fileIOEngine.write(ByteBuffer.allocate(1), expectedTotalSize); + } + + @Test + public void testSync() throws Exception { + FileIOEngine spyFileIOEngine = spy(fileIOEngine); + List spySegments = + new ArrayList<>(spyFileIOEngine.getSegments().size()); + for (FileIOEngine.SegmentFile segmentFile : spyFileIOEngine.getSegments()) { + spySegments.add(spy(segmentFile)); + } + when(fileIOEngine.getSegments()).thenReturn(spySegments); + + spyFileIOEngine.sync(); + for (FileIOEngine.SegmentFile spySegment : spySegments) { + verify(spySegment, times(1)).fsync(); + } + } + + @Test + public void testShutdown() throws Exception { + FileIOEngine shutdownFileIOEngine = new FileIOEngine(shutdownTestPaths, fileSize); + FileIOEngine spyShutdownFileIOEngine = spy(shutdownFileIOEngine); + + List spySegments = + new ArrayList<>(spyShutdownFileIOEngine.getSegments().size()); + for (FileIOEngine.SegmentFile segment : spyShutdownFileIOEngine.getSegments()) { + spySegments.add(spy(segment)); + } + when(spyShutdownFileIOEngine.getSegments()).thenReturn(spySegments); + + spyShutdownFileIOEngine.shutdown(); + + for (FileIOEngine.SegmentFile spySegment : spySegments) { + verify(spySegment, times(1)).close(); + verify(spySegment, times(1)).closeChannel(); + verify(spySegment, times(1)).closeRandomAccessFile(); + } + } + + private void assertReadWithOffsetAndLength(long segmentOffset, int length) throws Exception { + for (int i = 0; i < fileCount; ++i) { + long offset = i * fileSize + segmentOffset; + byte[] expected = new byte[length]; + random.nextBytes(expected); + try (RandomAccessFile raf = new RandomAccessFile(files[i], "rw")) { + raf.seek(segmentOffset); + raf.write(expected); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileIOEngine.read(offset, len, deserializer); - ByteBuff data2 = deserializer.getDeserializedByteBuff(); - for (int j = 0; j < data1.length; ++j) { - assertTrue(data1[j] == data2.get(j)); - } + fileIOEngine.read(offset, length, deserializer); + ByteBuff byteBuff = deserializer.getDeserializedByteBuff(); + assertArrayEquals(String.format("off=%d, len=%d", offset, length), expected, + byteBuff.array()); } - } finally { - File file = new File(filePath); - if (file.exists()) { - file.delete(); + } + } + + private void assertWriteWithOffsetAndLength(long segmentOffset, int length) throws Exception { + for (int i = 0; i < fileCount; ++i) { + long offset = i * fileSize + segmentOffset; + byte[] expected = new byte[length]; + random.nextBytes(expected); + fileIOEngine.write(ByteBuffer.wrap(expected), offset); + try (RandomAccessFile raf = new RandomAccessFile(files[i], "r")) { + byte[] actual = new byte[length]; + raf.seek(segmentOffset); + raf.read(actual); + assertArrayEquals(String.format("off=%d, len=%d", offset, length), expected, actual); + } + } + } + + private void assertExceptionOnReadWhenCrossingSegments(long segmentOffset, int length) + throws Exception { + for (int i = 0; i < fileCount; ++i) { + try { + long offset = (i * fileSize) + segmentOffset; + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileIOEngine.read(offset, length, deserializer); + } catch (IllegalArgumentException e) { + continue; } + fail(String.format("off=%d, len=%d", segmentOffset, length)); } + } + private void assertExceptionOnWriteWhenCrossingSegments(long segmentOffset, int length) + throws Exception { + for (int i = 0; i < fileCount; ++i) { + try { + long offset = (i * fileSize) + segmentOffset; + fileIOEngine.write(ByteBuffer.allocate(length), offset); + } catch (IllegalArgumentException e) { + continue; + } + fail(String.format("off=%d, len=%d", segmentOffset, length)); + } } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java index dfc18c7..0bb853f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java @@ -18,6 +18,7 @@ */ package org.apache.hadoop.hbase.io.hfile.bucket; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.File; @@ -28,6 +29,8 @@ import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrab import org.apache.hadoop.hbase.nio.ByteBuff; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.After; +import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -36,33 +39,47 @@ import org.junit.experimental.categories.Category; */ @Category({IOTests.class, SmallTests.class}) public class TestFileMmapEngine { + final int size = 2 * 1024 * 1024; // 2 MB + final String filePath = "testFileMmapEngine"; + + @After + public void tearDown() throws Exception { + File file = new File(filePath); + if (file.exists()) { + file.delete(); + } + } + @Test public void testFileMmapEngine() throws IOException { - int size = 2 * 1024 * 1024; // 2 MB - String filePath = "testFileMmapEngine"; - try { - FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size); - for (int i = 0; i < 50; i++) { - int len = (int) Math.floor(Math.random() * 100); - long offset = (long) Math.floor(Math.random() * size % (size - len)); - byte[] data1 = new byte[len]; - for (int j = 0; j < data1.length; ++j) { - data1[j] = (byte) (Math.random() * 255); - } - fileMmapEngine.write(ByteBuffer.wrap(data1), offset); - BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileMmapEngine.read(offset, len, deserializer); - ByteBuff data2 = deserializer.getDeserializedByteBuff(); - for (int j = 0; j < data1.length; ++j) { - assertTrue(data1[j] == data2.get(j)); - } + FileMmapEngine fileMmapEngine = new FileMmapEngine(filePath, size); + for (int i = 0; i < 50; i++) { + int len = (int) Math.floor(Math.random() * 100); + long offset = (long) Math.floor(Math.random() * size % (size - len)); + byte[] data1 = new byte[len]; + for (int j = 0; j < data1.length; ++j) { + data1[j] = (byte) (Math.random() * 255); } - } finally { - File file = new File(filePath); - if (file.exists()) { - file.delete(); + fileMmapEngine.write(ByteBuffer.wrap(data1), offset); + BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); + fileMmapEngine.read(offset, len, deserializer); + ByteBuff data2 = deserializer.getDeserializedByteBuff(); + for (int j = 0; j < data1.length; ++j) { + assertTrue(data1[j] == data2.get(j)); } } + } + @Test + public void testIsSegmented() throws Exception { + IOEngine ioEngine = new FileMmapEngine(filePath, size); + assertFalse(ioEngine.isSegmented()); } + + @Test + public void testDoesAllocationCrossSegments() throws Exception { + IOEngine ioEngine = new FileMmapEngine(filePath, size); + assertFalse(ioEngine.allocationCrossedSegments(0, 100)); + } + } \ No newline at end of file -- 2.10.2