diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java index ac842f6..e4093fb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java @@ -426,7 +426,7 @@ public class BucketCache implements BlockCache, HeapSize { } if (backingMap.containsKey(cacheKey)) { - Cacheable existingBlock = getBlock(cacheKey, false, false, false); + Cacheable existingBlock = getBlockInternal(cacheKey, false, false, false, true); if (BlockCacheUtil.compareCacheBlock(cachedItem, existingBlock) != 0) { throw new RuntimeException("Cached block contents differ, which should not have happened." + "cacheKey:" + cacheKey); @@ -478,6 +478,11 @@ public class BucketCache implements BlockCache, HeapSize { @Override public Cacheable getBlock(BlockCacheKey key, boolean caching, boolean repeat, boolean updateCacheMetrics) { + return getBlockInternal(key, caching, repeat, updateCacheMetrics, false); + } + + private Cacheable getBlockInternal(BlockCacheKey key, boolean caching, boolean repeat, + boolean updateCacheMetrics, boolean forceCopy) { if (!cacheEnabled) { return null; } @@ -506,7 +511,7 @@ public class BucketCache implements BlockCache, HeapSize { LOG.trace("Read offset=" + bucketEntry.offset() + ", len=" + len); } Cacheable cachedBlock = ioEngine.read(bucketEntry.offset(), len, - bucketEntry.deserializerReference(this.deserialiserMap)); + bucketEntry.deserializerReference(this.deserialiserMap), forceCopy); long timeTaken = System.nanoTime() - start; if (updateCacheMetrics) { cacheStats.hit(caching, key.isPrimary(), key.getBlockType()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java index 9f4ffba..138a9b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/ByteBufferIOEngine.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.io.hfile.Cacheable; import org.apache.hadoop.hbase.io.hfile.CacheableDeserializer; import org.apache.hadoop.hbase.io.hfile.Cacheable.MemoryType; import org.apache.hadoop.hbase.nio.ByteBuff; +import org.apache.hadoop.hbase.nio.SingleByteBuff; import org.apache.hadoop.hbase.util.ByteBufferAllocator; import org.apache.hadoop.hbase.util.ByteBufferArray; @@ -103,16 +104,23 @@ public class ByteBufferIOEngine implements IOEngine { } @Override - public Cacheable read(long offset, int length, CacheableDeserializer deserializer) - throws IOException { - ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); - // Here the buffer that is created directly refers to the buffer in the actual buckets. - // When any cell is referring to the blocks created out of these buckets then it means that - // those cells are referring to a shared memory area which if evicted by the BucketCache would - // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY - // so that the readers using this block are aware of this fact and do the necessary action - // to prevent eviction till the results are either consumed or copied - return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED); + public Cacheable read(long offset, int length, CacheableDeserializer deserializer, + boolean forceCopy) throws IOException { + if (forceCopy) { + byte[] dst = new byte[length]; + bufferArray.getMultiple(offset, length, dst); + return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, + MemoryType.EXCLUSIVE); + } else { + ByteBuff dstBuffer = bufferArray.asSubByteBuff(offset, length); + // Here the buffer that is created directly refers to the buffer in the actual buckets. + // When any cell is referring to the blocks created out of these buckets then it means that + // those cells are referring to a shared memory area which if evicted by the BucketCache would + // lead to corruption of results. Hence we set the type of the buffer as SHARED_MEMORY + // so that the readers using this block are aware of this fact and do the necessary action + // to prevent eviction till the results are either consumed or copied + return deserializer.deserialize(dstBuffer, true, MemoryType.SHARED); + } } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java index ad1c394..638978f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java @@ -124,8 +124,8 @@ public class FileIOEngine implements IOEngine { * @throws IOException */ @Override - public Cacheable read(long offset, int length, CacheableDeserializer deserializer) - throws IOException { + public Cacheable read(long offset, int length, CacheableDeserializer deserializer, + boolean forceCopy) throws IOException { Preconditions.checkArgument(length >= 0, "Length of read can not be less than 0."); ByteBuffer dstBuffer = ByteBuffer.allocate(length); if (length != 0) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java index 4fe39d3..8f0ac96 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileMmapEngine.java @@ -101,8 +101,8 @@ public class FileMmapEngine implements IOEngine { } @Override - public Cacheable read(long offset, int length, CacheableDeserializer deserializer) - throws IOException { + public Cacheable read(long offset, int length, CacheableDeserializer deserializer, + boolean forceCopy) throws IOException { byte[] dst = new byte[length]; bufferArray.getMultiple(offset, length, dst); return deserializer.deserialize(new SingleByteBuff(ByteBuffer.wrap(dst)), true, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java index 61bde6b..6e934c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/IOEngine.java @@ -42,12 +42,13 @@ public interface IOEngine { * @param length How many bytes to be read from the offset * @param offset The offset in the IO engine where the first byte to be read * @param deserializer The deserializer to be used to make a Cacheable from the data. + * @param forceCopy forcefully copies the cached data to an exclusive buffer * @return Cacheable * @throws IOException * @throws RuntimeException when the length of the ByteBuff read is less than 'len' */ - Cacheable read(long offset, int length, CacheableDeserializer deserializer) - throws IOException; + Cacheable read(long offset, int length, CacheableDeserializer deserializer, + boolean forceCopy) throws IOException; /** * Transfers data from the given byte buffer to IOEngine diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java index d6f9d71..5827adb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBlockEvictionFromClient.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Optional; @@ -39,6 +40,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; @@ -201,7 +204,6 @@ public class TestBlockEvictionFromClient { assertTrue(Bytes.equals(table.get(new Get(ROW)).value(), data)); // data was in memstore so don't expect any changes // flush the data - System.out.println("Flushing cache in problematic area"); // Should create one Hfile with 2 blocks region.flush(true); // Load cache @@ -597,10 +599,14 @@ public class TestBlockEvictionFromClient { region.flush(true); LOG.info("About to SPLIT on " + Bytes.toString(ROW1)); TEST_UTIL.getAdmin().split(tableName, ROW1); - List tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); // Wait for splits - while (tableRegions.size() != 2) { - tableRegions = TEST_UTIL.getAdmin().getRegions(tableName); + Collection regionServers = TEST_UTIL.getAdmin().getRegionServers(); + Iterator serverItr = regionServers.iterator(); + serverItr.hasNext(); + ServerName rs = serverItr.next(); + List onlineRegions = TEST_UTIL.getAdmin().getRegions(rs); + while (onlineRegions.size() != 2) { + onlineRegions = TEST_UTIL.getAdmin().getRegions(rs); Thread.sleep(100); LOG.info("Waiting on SPLIT to complete..."); } @@ -862,7 +868,7 @@ public class TestBlockEvictionFromClient { testScanWithCompactionInternals(name.getMethodName(), false); } - @Ignore @Test + @Test public void testReverseScanWithCompaction() throws IOException, InterruptedException { testScanWithCompactionInternals(name.getMethodName(), true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java index ab2276a..544107a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestByteBufferIOEngine.java @@ -69,7 +69,7 @@ public class TestByteBufferIOEngine { } ioEngine.write(srcBuffer, offset); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - ioEngine.read(offset, blockSize, deserializer); + ioEngine.read(offset, blockSize, deserializer, false); ByteBuff dstBuffer = deserializer.buf; for (int j = 0; j < byteArray.length; ++j) { assertTrue(byteArray[j] == dstBuffer.get(j)); @@ -139,7 +139,7 @@ public class TestByteBufferIOEngine { } ioEngine.write(srcBuffer, offset); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - ioEngine.read(offset, blockSize, deserializer); + ioEngine.read(offset, blockSize, deserializer, false); ByteBuff dstBuffer = deserializer.buf; for (int j = 0; j < byteArray.length; ++j) { assertTrue(srcBuffer.get(j) == dstBuffer.get(j)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java index 1bcc026..7872e97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java @@ -102,7 +102,7 @@ public class TestFileIOEngine { } fileIOEngine.write(ByteBuffer.wrap(data1), offset); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileIOEngine.read(offset, len, deserializer); + fileIOEngine.read(offset, len, deserializer, true); ByteBuff data2 = deserializer.getDeserializedByteBuff(); assertArrayEquals(data1, data2.array()); } @@ -114,7 +114,7 @@ public class TestFileIOEngine { fileIOEngine.write(ByteBuffer.wrap(data1), 0); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileIOEngine.read(0, 0, deserializer); + fileIOEngine.read(0, 0, deserializer, true); ByteBuff data2 = deserializer.getDeserializedByteBuff(); assertArrayEquals(data1, data2.array()); } @@ -130,7 +130,7 @@ public class TestFileIOEngine { } fileIOEngine.write(ByteBuffer.wrap(data1), offset); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileIOEngine.read(offset, len, deserializer); + fileIOEngine.read(offset, len, deserializer, true); ByteBuff data2 = deserializer.getDeserializedByteBuff(); assertArrayEquals(data1, data2.array()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java index 85bd4a2..df9d3ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileMmapEngine.java @@ -51,7 +51,7 @@ public class TestFileMmapEngine { } fileMmapEngine.write(ByteBuffer.wrap(data1), offset); BufferGrabbingDeserializer deserializer = new BufferGrabbingDeserializer(); - fileMmapEngine.read(offset, len, deserializer); + fileMmapEngine.read(offset, len, deserializer, true); ByteBuff data2 = deserializer.getDeserializedByteBuff(); for (int j = 0; j < data1.length; ++j) { assertTrue(data1[j] == data2.get(j));