From d3b2fe536d1cbf47663e8ed9931d2372015c413c Mon Sep 17 00:00:00 2001 From: Jacob LeBlanc Date: Tue, 24 Sep 2019 13:45:51 -0400 Subject: [PATCH] HBASE-23066 - Added hbase.rs.prefetchcompactedblocksonwrite parameter for adding blocks to the cache during compaction when prefetchOnOpen is enabled for the column family --- .../apache/hadoop/hbase/io/hfile/CacheConfig.java | 22 ++++++++++ .../apache/hadoop/hbase/regionserver/HStore.java | 4 +- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 48 ++++++++++++++++++---- 3 files changed, 63 insertions(+), 11 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index bb57fbe..552df32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -81,6 +81,13 @@ public class CacheConfig { */ public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * Configuration key to cache blocks when a compacted file is written, predicated on prefetching being enabled for + * the column family. + */ + public static final String PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY = + "hbase.rs.prefetchcompactedblocksonwrite"; + public static final String DROP_BEHIND_CACHE_COMPACTION_KEY = "hbase.hfile.drop.behind.compaction"; @@ -93,6 +100,7 @@ public class CacheConfig { public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; + public static final boolean DEFAULT_PREFETCH_COMPACTED_BLOCKS_ON_WRITE = false; public static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; /** @@ -124,6 +132,9 @@ public class CacheConfig { /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; + /** Whether data blocks should be cached when compacted file is written for column families with prefetching */ + private final boolean prefetchCompactedDataOnWrite; + private final boolean dropBehindCompaction; // Local reference to the block cache @@ -174,6 +185,8 @@ public class CacheConfig { (family == null ? false : family.isEvictBlocksOnClose()); this.prefetchOnOpen = conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || (family == null ? false : family.isPrefetchBlocksOnOpen()); + this.prefetchCompactedDataOnWrite = conf.getBoolean(PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY, + DEFAULT_PREFETCH_COMPACTED_BLOCKS_ON_WRITE); this.blockCache = blockCache; this.byteBuffAllocator = byteBuffAllocator; LOG.info("Created cacheConfig: " + this + (family == null ? "" : " for family " + family) + @@ -193,6 +206,7 @@ public class CacheConfig { this.evictOnClose = cacheConf.evictOnClose; this.cacheDataCompressed = cacheConf.cacheDataCompressed; this.prefetchOnOpen = cacheConf.prefetchOnOpen; + this.prefetchCompactedDataOnWrite = cacheConf.prefetchCompactedDataOnWrite; this.dropBehindCompaction = cacheConf.dropBehindCompaction; this.blockCache = cacheConf.blockCache; this.byteBuffAllocator = cacheConf.byteBuffAllocator; @@ -207,6 +221,7 @@ public class CacheConfig { this.evictOnClose = false; this.cacheDataCompressed = false; this.prefetchOnOpen = false; + this.prefetchCompactedDataOnWrite = false; this.dropBehindCompaction = false; this.blockCache = null; this.byteBuffAllocator = ByteBuffAllocator.HEAP; @@ -320,6 +335,13 @@ public class CacheConfig { } /** + * @return true if blocks should be cached while writing during compaction, false if not + */ + public boolean shouldCacheCompactedBlocksOnWrite() { + return this.prefetchCompactedDataOnWrite && this.prefetchOnOpen; + } + + /** * Return true if we may find this type of block in block cache. *

* TODO: today {@code family.isBlockCacheEnabled()} only means {@code cacheDataOnRead}, so here we diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 447bc73..e693567 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1118,9 +1118,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { - // Don't cache data on write on compactions. + // Don't cache data on write on compactions, unless specifically configured to do so writerCacheConf = new CacheConfig(cacheConf); - writerCacheConf.setCacheDataOnWrite(false); + writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite()); } else { writerCacheConf = cacheConf; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 3a769b0..d77fe3c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -53,6 +53,8 @@ import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; import org.apache.hadoop.hbase.io.hfile.bucket.BucketCache; import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.HStoreFile; import org.apache.hadoop.hbase.regionserver.StoreFileWriter; import org.apache.hadoop.hbase.testclassification.IOTests; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -186,7 +188,7 @@ public class TestCacheOnWrite { for (CacheOnWriteType cowType : CacheOnWriteType.values()) { for (Compression.Algorithm compress : HBaseCommonTestingUtility.COMPRESSION_ALGORITHMS) { for (boolean cacheCompressedData : new boolean[] { false, true }) { - params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache }); + params.add(new Object[] { cowType, compress, cacheCompressedData, blockCache}); } } } @@ -405,8 +407,12 @@ public class TestCacheOnWrite { storeFilePath = sfw.getPath(); } - private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) + private void testCachingDataBlocksDuringCompactionInternals(boolean useTags, boolean prefetchCompactedBlocksOnWrite) throws IOException, InterruptedException { + + // Set the conf if testing caching compacted blocks on write + conf.setBoolean(CacheConfig.PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY, prefetchCompactedBlocksOnWrite); + // TODO: need to change this test if we add a cache size threshold for // compactions, or if we implement some other kind of intelligent logic for // deciding what blocks to cache-on-write on compaction. @@ -415,8 +421,8 @@ public class TestCacheOnWrite { final byte[] cfBytes = Bytes.toBytes(cf); final int maxVersions = 3; ColumnFamilyDescriptor cfd = - ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setCompressionType(compress) - .setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) + ColumnFamilyDescriptorBuilder.newBuilder(cfBytes).setPrefetchBlocksOnOpen(prefetchCompactedBlocksOnWrite) + .setCompressionType(compress).setBloomFilterType(BLOOM_TYPE).setMaxVersions(maxVersions) .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()).build(); HRegion region = TEST_UTIL.createTestRegion(table, cfd, blockCache); int rowIdx = 0; @@ -448,16 +454,39 @@ public class TestCacheOnWrite { } region.flush(true); } + + // If prefetching is enabled, cancel all prefetches before clearing block cache + if (prefetchCompactedBlocksOnWrite) { + for (HStore store : region.getStores()) { + for (HStoreFile storeFile : store.getStorefiles()) { + PrefetchExecutor.cancel(storeFile.getPath()); + } + } + } clearBlockCache(blockCache); assertEquals(0, blockCache.getBlockCount()); + region.compact(false); LOG.debug("compactStores() returned"); + boolean dataBlockCached = false; for (CachedBlock block: blockCache) { - assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); - assertNotEquals(BlockType.DATA, block.getBlockType()); + if (BlockType.ENCODED_DATA.equals(block.getBlockType()) || BlockType.DATA.equals(block.getBlockType())) { + dataBlockCached = true; + break; + } } + + // Data blocks should be cached in instances where we are caching blocks on write. In the case of testing + // BucketCache, we cannot verify block type as it is not stored in the cache. + assertTrue("\nTest description: " + testDescription + "\nprefetchCompactedBlocksOnWrite: " + + prefetchCompactedBlocksOnWrite + "\n", (prefetchCompactedBlocksOnWrite && + !(blockCache instanceof BucketCache)) == dataBlockCached); + region.close(); + + // Clear the cache on write setting + conf.setBoolean(CacheConfig.PREFETCH_COMPACTED_BLOCKS_ON_WRITE_KEY, false); } @Test @@ -467,8 +496,9 @@ public class TestCacheOnWrite { } @Test - public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { - testNotCachingDataBlocksDuringCompactionInternals(false); - testNotCachingDataBlocksDuringCompactionInternals(true); + public void testCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { + testCachingDataBlocksDuringCompactionInternals(false, false); + testCachingDataBlocksDuringCompactionInternals(true, false); + testCachingDataBlocksDuringCompactionInternals(true, true); } } -- 1.8.3.1