From d5cb0aadc30105f8f7e28ce3b8d8adb18180d2cc Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Thu, 20 Feb 2020 20:43:14 +0530 Subject: [PATCH] HBASE-23066 : Allow cache on write during compactions when prefetching is enabled --- .../hadoop/hbase/io/hfile/CacheConfig.java | 36 ++++- .../hadoop/hbase/regionserver/HStore.java | 4 +- .../hbase/io/hfile/TestCacheOnWrite.java | 129 ++++++++++-------- 3 files changed, 108 insertions(+), 61 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 87eae04348..81ca56f8ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -126,6 +126,12 @@ public class CacheConfig { public static final String PREFETCH_BLOCKS_ON_OPEN_KEY = "hbase.rs.prefetchblocksonopen"; + /** + * Configuration key to cache blocks when a compacted file is written + */ + public static final String CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY = + "hbase.rs.cachecompactedblocksonwrite"; + /** * The target block size used by blockcache instances. Defaults to * {@link HConstants#DEFAULT_BLOCKSIZE}. @@ -183,6 +189,7 @@ public class CacheConfig { public static final boolean DEFAULT_EVICT_ON_CLOSE = false; public static final boolean DEFAULT_CACHE_DATA_COMPRESSED = false; public static final boolean DEFAULT_PREFETCH_ON_OPEN = false; + public static final boolean DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE = false; /** Local reference to the block cache, null if completely disabled */ private final BlockCache blockCache; @@ -216,6 +223,11 @@ public class CacheConfig { /** Whether data blocks should be prefetched into the cache */ private final boolean prefetchOnOpen; + /** + * Whether data blocks should be cached when compacted file is written + */ + private final boolean cacheCompactedDataOnWrite; + /** * If true and if more than one tier in this cache deploy -- e.g. CombinedBlockCache has an L1 * and an L2 tier -- then cache data blocks up in the L1 tier (The meta blocks are likely being @@ -250,8 +262,10 @@ public class CacheConfig { DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(), - conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) - ); + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT), + conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, + DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE) + ); LOG.info("Created cacheConfig for " + family.getNameAsString() + ": " + this); } @@ -275,7 +289,9 @@ public class CacheConfig { conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1), - conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT), + conf.getBoolean(CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, + DEFAULT_CACHE_COMPACTED_BLOCKS_ON_WRITE) ); LOG.info("Created cacheConfig: " + this); } @@ -301,7 +317,8 @@ public class CacheConfig { final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, final boolean cacheDataCompressed, final boolean prefetchOnOpen, - final boolean cacheDataInL1, final boolean dropBehindCompaction) { + final boolean cacheDataInL1, final boolean dropBehindCompaction, + final boolean cacheCompactedDataOnWrite) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -313,6 +330,7 @@ public class CacheConfig { this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; this.dropBehindCompaction = dropBehindCompaction; + this.cacheCompactedDataOnWrite = cacheCompactedDataOnWrite; } /** @@ -324,7 +342,15 @@ public class CacheConfig { cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, - cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction); + cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction, + cacheConf.cacheCompactedDataOnWrite); + } + + /** + * @return true if blocks should be cached while writing during compaction, false if not + */ + public boolean shouldCacheCompactedBlocksOnWrite() { + return this.cacheCompactedDataOnWrite; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index d7eadecc80..92a9ea6555 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1111,9 +1111,9 @@ public class HStore implements Store { throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { - // Don't cache data on write on compactions. + // Don't cache data on write on compactions, unless specifically configured to do so writerCacheConf = new CacheConfig(cacheConf); - writerCacheConf.setCacheDataOnWrite(false); + writerCacheConf.setCacheDataOnWrite(cacheConf.shouldCacheCompactedBlocksOnWrite()); } else { writerCacheConf = cacheConf; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index 7448ddd9c8..0bbae69ce0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -224,7 +224,7 @@ public class TestCacheOnWrite { new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), cowType.shouldBeCached(BlockType.LEAF_INDEX), cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, - false, false, false); + false, false, false, false); } @After @@ -381,66 +381,87 @@ public class TestCacheOnWrite { storeFilePath = sfw.getPath(); } - private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags) + private void testNotCachingDataBlocksDuringCompactionInternals(boolean useTags, + boolean cacheBlocksOnCompaction) throws IOException, InterruptedException { - if (useTags) { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 3); - } else { - TEST_UTIL.getConfiguration().setInt("hfile.format.version", 2); - } - // TODO: need to change this test if we add a cache size threshold for - // compactions, or if we implement some other kind of intelligent logic for - // deciding what blocks to cache-on-write on compaction. - final String table = "CompactionCacheOnWrite"; - final String cf = "myCF"; - final byte[] cfBytes = Bytes.toBytes(cf); - final int maxVersions = 3; - Region region = TEST_UTIL.createTestRegion(table, + // create a localConf + boolean localValue = conf.getBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, + false); + try { + // Set the conf if testing caching compacted blocks on write + conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, + cacheBlocksOnCompaction); + + // TODO: need to change this test if we add a cache size threshold for + // compactions, or if we implement some other kind of intelligent logic for + // deciding what blocks to cache-on-write on compaction. + final String table = "CompactionCacheOnWrite"; + final String cf = "myCF"; + final byte[] cfBytes = Bytes.toBytes(cf); + final int maxVersions = 3; + Region region = TEST_UTIL.createTestRegion(table, new HColumnDescriptor(cf) - .setCompressionType(compress) - .setBloomFilterType(BLOOM_TYPE) - .setMaxVersions(maxVersions) - .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) - ); - int rowIdx = 0; - long ts = EnvironmentEdgeManager.currentTime(); - for (int iFile = 0; iFile < 5; ++iFile) { - for (int iRow = 0; iRow < 500; ++iRow) { - String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + - iRow; - Put p = new Put(Bytes.toBytes(rowStr)); - ++rowIdx; - for (int iCol = 0; iCol < 10; ++iCol) { - String qualStr = "col" + iCol; - String valueStr = "value_" + rowStr + "_" + qualStr; - for (int iTS = 0; iTS < 5; ++iTS) { - if (useTags) { - Tag t = new Tag((byte) 1, "visibility"); - Tag[] tags = new Tag[1]; - tags[0] = t; - KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), + .setCompressionType(compress) + .setBloomFilterType(BLOOM_TYPE) + .setMaxVersions(maxVersions) + .setDataBlockEncoding(NoOpDataBlockEncoder.INSTANCE.getDataBlockEncoding()) + ); + int rowIdx = 0; + long ts = EnvironmentEdgeManager.currentTime(); + for (int iFile = 0; iFile < 5; ++iFile) { + for (int iRow = 0; iRow < 500; ++iRow) { + String rowStr = "" + (rowIdx * rowIdx * rowIdx) + "row" + iFile + "_" + iRow; + Put p = new Put(Bytes.toBytes(rowStr)); + ++rowIdx; + for (int iCol = 0; iCol < 10; ++iCol) { + String qualStr = "col" + iCol; + String valueStr = "value_" + rowStr + "_" + qualStr; + for (int iTS = 0; iTS < 5; ++iTS) { + if (useTags) { + Tag t = new Tag((byte) 1, "visibility"); + Tag[] tags = new Tag[1]; + tags[0] = t; + KeyValue kv = new KeyValue(Bytes.toBytes(rowStr), cfBytes, Bytes.toBytes(qualStr), HConstants.LATEST_TIMESTAMP, Bytes.toBytes(valueStr), tags); - p.add(kv); - } else { - p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); + p.add(kv); + } else { + p.addColumn(cfBytes, Bytes.toBytes(qualStr), ts++, Bytes.toBytes(valueStr)); + } } } + p.setDurability(Durability.ASYNC_WAL); + region.put(p); } - p.setDurability(Durability.ASYNC_WAL); - region.put(p); + region.flush(true); } - region.flush(true); - } - clearBlockCache(blockCache); - assertEquals(0, blockCache.getBlockCount()); - region.compact(false); - LOG.debug("compactStores() returned"); - for (CachedBlock block: blockCache) { - assertNotEquals(BlockType.ENCODED_DATA, block.getBlockType()); - assertNotEquals(BlockType.DATA, block.getBlockType()); + clearBlockCache(blockCache); + assertEquals(0, blockCache.getBlockCount()); + + region.compact(false); + LOG.debug("compactStores() returned"); + + boolean dataBlockCached = false; + for (CachedBlock block : blockCache) { + if (BlockType.ENCODED_DATA.equals(block.getBlockType()) + || BlockType.DATA.equals(block.getBlockType())) { + dataBlockCached = true; + break; + } + } + // Data blocks should be cached in instances where we are caching blocks on write. In the case + // of testing + // BucketCache, we cannot verify block type as it is not stored in the cache. + assertTrue( + "\nTest description: " + testDescription + "\ncacheBlocksOnCompaction: " + + cacheBlocksOnCompaction + "\n", + (cacheBlocksOnCompaction && !(blockCache instanceof BucketCache)) == dataBlockCached); + + ((HRegion)region).close(); + } finally { + // reset back + conf.setBoolean(CacheConfig.CACHE_COMPACTED_BLOCKS_ON_WRITE_KEY, localValue); } - ((HRegion)region).close(); } @Test @@ -451,7 +472,7 @@ public class TestCacheOnWrite { @Test public void testNotCachingDataBlocksDuringCompaction() throws IOException, InterruptedException { - testNotCachingDataBlocksDuringCompactionInternals(false); - testNotCachingDataBlocksDuringCompactionInternals(true); + testNotCachingDataBlocksDuringCompactionInternals(false, false); + testNotCachingDataBlocksDuringCompactionInternals(true, true); } } -- 2.17.2 (Apple Git-113)