From 41009b930b011ef028f05774e7a8a4e3f23871b8 Mon Sep 17 00:00:00 2001 From: Sam An Date: Tue, 28 May 2019 19:41:06 -0700 Subject: [PATCH] Hive-21787: Metastore table cache enhancement (Sam An, reviewed by Daniel Dai) --- .../hive/metastore/cache/CacheUtils.java | 2 +- .../hive/metastore/cache/CachedStore.java | 57 +- .../hive/metastore/cache/SharedCache.java | 557 +++++++++++++----- .../hive/metastore/cache/TestCachedStore.java | 202 ++++++- 4 files changed, 646 insertions(+), 172 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index d50fa137f0..bb673f428f 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -65,7 +65,7 @@ public static String buildTableColKey(String catName, String dbName, String tabl return buildKey(catName, dbName, tableName, colName); } - private static String buildKey(String... elements) { + public static String buildKey(String... elements) { return org.apache.commons.lang.StringUtils.join(elements, delimit); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 1552ea0b8d..820f52bc3e 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -120,7 +120,7 @@ private Configuration conf; private static boolean areTxnStatsSupported; private PartitionExpressionProxy expressionProxy = null; - private static SharedCache sharedCache = new SharedCache(); + private static SharedCache sharedCache = null; private static boolean canUseEvents = false; private static long lastEventId; @@ -140,9 +140,16 @@ public void setConf(Configuration conf) { * @param conf */ void setConfForTest(Configuration conf) { + setConfForTestExceptSharedCache(conf); + initSharedCache(conf); + } + + void setConfForTestExceptSharedCache(Configuration conf){ setConfInternal(conf); initBlackListWhiteList(conf); - initSharedCache(conf); + } + void setSharedCache(SharedCache sc){ + sharedCache = sc; } synchronized private static void triggerUpdateUsingEvent(RawStore rawStore) { @@ -198,18 +205,15 @@ private void setConfInternal(Configuration conf) { } private void initSharedCache(Configuration conf) { - long maxSharedCacheSizeInBytes = - MetastoreConf.getSizeVar(conf, ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY); - sharedCache.initialize(maxSharedCacheSizeInBytes); - if (maxSharedCacheSizeInBytes > 0) { - LOG.info("Maximum memory that the cache will use: {} KB", - maxSharedCacheSizeInBytes / (1024)); - } + SharedCache.Builder builder = new SharedCache.Builder(); + sharedCache = builder + .configuration(conf) + .build(); } @VisibleForTesting public static SharedCache getSharedCache() { - return sharedCache; + return sharedCache; } static private ColumnStatistics updateStatsForAlterPart(RawStore rawStore, Table before, String catalogName, @@ -1209,7 +1213,11 @@ public Table getTable(String catName, String dbName, String tblName, String vali // let's move this table to the top of tblNamesBeingPrewarmed stack, // so that it gets loaded to the cache faster and is available for subsequent requests tblsPendingPrewarm.prioritizeTableForPrewarm(tblName); - return rawStore.getTable(catName, dbName, tblName, validWriteIds); + Table t = rawStore.getTable(catName, dbName, tblName, validWriteIds); + if (t != null) { + sharedCache.addTableToCache(catName, dbName, tblName, t); + } + return t; } if (validWriteIds != null) { tbl.setParameters( @@ -1450,23 +1458,13 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getTables(String catName, String dbName, String pattern) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { - return rawStore.getTables(catName, dbName, pattern); - } - return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), pattern, -1); + return rawStore.getTables(catName, dbName, pattern); } @Override public List getTables(String catName, String dbName, String pattern, TableType tableType, int limit) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get()|| !isCachedAllMetadata.get() - || (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getTables(catName, dbName, pattern, tableType, limit); - } - return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName), pattern, tableType, limit); } @Override @@ -1484,14 +1482,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getTableMeta(String catName, String dbNames, String tableNames, List tableTypes) throws MetaException { - // TODO Check if all required tables are allowed, if so, get it from cache - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { - return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); - } - return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbNames), - StringUtils.normalizeIdentifier(tableNames), tableTypes); + return rawStore.getTableMeta(catName, dbNames, tableNames, tableTypes); } @Override @@ -1523,6 +1514,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena Table tbl = sharedCache.getTableFromCache(catName, dbName, tblName); if (tbl == null) { tbl = rawStore.getTable(catName, dbName, tblName); + sharedCache.addTableToCache(catName, dbName, tblName, tbl); } if (tbl != null) { tables.add(tbl); @@ -1534,12 +1526,7 @@ public void updateCreationMetadata(String catName, String dbname, String tablena @Override public List getAllTables(String catName, String dbName) throws MetaException { - if (!isBlacklistWhitelistEmpty(conf) || !isCachePrewarmed.get() || !isCachedAllMetadata.get() || - (canUseEvents && rawStore.isActiveTransaction())) { return rawStore.getAllTables(catName, dbName); - } - return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(catName), - StringUtils.normalizeIdentifier(dbName)); } @Override diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 2c7354a881..85b1e8652a 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -17,29 +17,24 @@ */ package org.apache.hadoop.hive.metastore.cache; +import java.lang.reflect.Field; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.TreeMap; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheStats; +import com.google.common.cache.Weigher; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidWriteIdList; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.Warehouse; -import org.apache.hadoop.hive.metastore.HiveMetaException; -import org.apache.hadoop.hive.metastore.ObjectStore; -import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.*; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.Catalog; @@ -52,17 +47,18 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; +import org.apache.hadoop.hive.metastore.conf.MetastoreConf; import org.apache.hadoop.hive.metastore.txn.TxnUtils; import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator; import org.apache.hadoop.hive.ql.util.IncrementalObjectSizeEstimator.ObjectEstimator; +import org.eclipse.jetty.util.ConcurrentHashSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; -import static org.apache.hadoop.hive.metastore.cache.CachedStore.partNameToVals; import static org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier; public class SharedCache { @@ -79,7 +75,9 @@ private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); // For caching TableWrapper objects. Key is aggregate of database name and table name - private Map tableCache = new TreeMap<>(); + private ReentrantReadWriteLock tableCacheRWLock = new ReentrantReadWriteLock(true); + private Cache tableCache = null; + private boolean isTableCachePrewarmed = false; private HashSet tablesDeletedDuringPrewarm = new HashSet<>(); private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); @@ -90,6 +88,9 @@ private static long maxCacheSizeInBytes = -1; private static long currentCacheSizeInBytes = 0; private static HashMap, ObjectEstimator> sizeEstimators = null; + private Set tableToUpdateSize = new ConcurrentHashSet<>(); + private ScheduledExecutorService executor = null; + private Map tableSizeMap = null; enum StatsType { ALL(0), ALLBUTDEFAULT(1), PARTIAL(2); @@ -104,6 +105,12 @@ public int getPosition() { return position; } } + private enum MemberName{ + TABLE_COL_STATS_CACHE, + PARTITION_CACHE, + PARTITION_COL_STATS_CACHE, + AGGR_COL_STATS_CACHE + } static { try { @@ -113,13 +120,114 @@ public int getPosition() { } } + static class TableWrapperSizeUpdater implements Runnable{ + private Set setToUpdate; + ReentrantReadWriteLock lock; + Cache cache; + + TableWrapperSizeUpdater(Set set, ReentrantReadWriteLock lock1, Cache cache1){ + setToUpdate = set; + lock = lock1; + cache = cache1; + } + + @Override public void run() { + for(String s: setToUpdate){ + refreshTableWrapperInCache(s); + } + setToUpdate.clear(); + } + + void refreshTableWrapperInCache(String tblKey){ + try { + lock.writeLock().lock(); + TableWrapper tw = cache.getIfPresent(tblKey); + if (tw != null) { + //cache will re-weigh the TableWrapper and record new weight. + cache.put(tblKey, tw); + } + }finally { + lock.writeLock().unlock(); + } + } + } + + public static class Builder{ + private Map tableSizeMap = null; + private int concurrencyLevel = -1; + private long maxBytes; + private int refreshInterval = 10000; + private Configuration conf; + + Builder tableSizeMap(Map mp){ + this.tableSizeMap = mp; + return this; + } + + Builder configuration(Configuration c){ + this.conf = c; + return this; + } + Builder concurrencyLevel(int cl){ + this.concurrencyLevel = cl; + return this; + } + + Builder maxSharedCacheSizeInBytes(long b){ + maxBytes = b; + return this; + } + + Builder refreshInterval(int numMillis){ + this.refreshInterval = numMillis; + return this; + } + + public SharedCache build(){ + SharedCache sc = new SharedCache(); + sc.tableSizeMap = this.tableSizeMap; + sc.initialize(conf, refreshInterval, concurrencyLevel); + return sc; + } + } - public void initialize(long maxSharedCacheSizeInBytes) { + public void initialize(Configuration conf, int refreshInterval, int concurrencyLevel) { + long maxSharedCacheSizeInBytes = + MetastoreConf.getSizeVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY); maxCacheSizeInBytes = maxSharedCacheSizeInBytes; + // Create estimators if ((maxCacheSizeInBytes > 0) && (sizeEstimators == null)) { sizeEstimators = IncrementalObjectSizeEstimator.createEstimators(SharedCache.class); } + + if(tableCache == null) { + CacheBuilder b = CacheBuilder.newBuilder() + .maximumWeight(maxSharedCacheSizeInBytes > 0 ? maxSharedCacheSizeInBytes : 1024 * 1024) + .weigher(new Weigher() { + @Override public int weigh(String key, TableWrapper value) { + return value.getSize(); + } + }); + + if (concurrencyLevel > 0){ + b.concurrencyLevel(concurrencyLevel); + } + + tableCache = b.recordStats().build(); + } + + + executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { + @Override public Thread newThread(Runnable r) { + Thread t = Executors.defaultThreadFactory().newThread(r); + t.setName("SharedCache table size updater: Thread-" + t.getId()); + t.setDaemon(true); + return t; + } + }); + executor.scheduleAtFixedRate(new TableWrapperSizeUpdater(tableToUpdateSize, tableCacheRWLock, tableCache), 0, refreshInterval, TimeUnit.MILLISECONDS); + } private static ObjectEstimator getMemorySizeEstimator(Class clazz) { @@ -131,11 +239,36 @@ private static ObjectEstimator getMemorySizeEstimator(Class clazz) { return estimator; } - static class TableWrapper { + public static int getObjectSize(Class clazz, Object obj){ + if (sizeEstimators == null){ + return 0; + } + + try { + ObjectEstimator oe = getMemorySizeEstimator(clazz); + return oe.estimate(obj, sizeEstimators); + } catch (Exception e) { + LOG.error("Error while getting object size.", e); + } + return 0; + } + + enum SizeMode{ + Delta, + Snapshot + } + + class TableWrapper { Table t; String location; Map parameters; byte[] sdHash; + int otherSize; + int tableColStatsCacheSize; + int partitionCacheSize; + int partitionColStatsCacheSize; + int aggrColStatsCacheSize; + ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true); // For caching column stats for an unpartitioned table // Key is column name and the value is the col stat object @@ -164,6 +297,55 @@ private static ObjectEstimator getMemorySizeEstimator(Class clazz) { this.sdHash = sdHash; this.location = location; this.parameters = parameters; + this.tableColStatsCacheSize = 0; + this.partitionCacheSize = 0; + this.partitionColStatsCacheSize = 0; + this.aggrColStatsCacheSize = 0; + this.otherSize = getTableWrapperSizeWithoutMaps(); + } + + private int getTableWrapperSizeWithoutMaps(){ + Class clazz = TableWrapper.class; + Field[] fields = clazz.getDeclaredFields(); + int size = 0; + for (Field field : fields){ + if ( field.getType().equals(ConcurrentHashMap.class)){ + continue; + } + if ( field.getType().equals(SharedCache.class)){ + continue; + } + try { + field.setAccessible(true); + Object val = field.get(this); + ObjectEstimator oe = getMemorySizeEstimator(field.getType()); + if (oe != null) { + size += oe.estimate(val, sizeEstimators); + } + }catch(Exception ex){ + LOG.error("Not able to estimate size.", ex); + } + } + + return size; + } + + public int getSize(){ + //facilitate testing only. In production we won't use tableSizeMap at all. + if (tableSizeMap != null){ + String tblKey = CacheUtils.buildTableKey(this.t.getCatName(), this.t.getDbName(), this.t.getTableName()); + if (tableSizeMap.containsKey(tblKey)){ + return tableSizeMap.get(tblKey); + } + } + if(sizeEstimators == null){ + return 0; + } + return otherSize + + tableColStatsCacheSize + + partitionCacheSize + + partitionColStatsCacheSize + + aggrColStatsCacheSize; } public Table getTable() { @@ -202,15 +384,69 @@ boolean sameDatabase(String catName, String dbName) { return catName.equals(t.getCatName()) && dbName.equals(t.getDbName()); } + private void updateMemberSize(MemberName mn, Integer size, SizeMode mode){ + if( sizeEstimators == null){ + return; + } + + switch (mn) { + case TABLE_COL_STATS_CACHE: + if (mode == SizeMode.Delta) { + tableColStatsCacheSize += size; + }else{ + tableColStatsCacheSize = size; + } + break; + case PARTITION_CACHE: + if (mode == SizeMode.Delta) { + partitionCacheSize += size; + }else{ + partitionCacheSize = size; + } + break; + case PARTITION_COL_STATS_CACHE: + if (mode == SizeMode.Delta) { + partitionColStatsCacheSize += size; + }else{ + partitionColStatsCacheSize = size; + } + break; + case AGGR_COL_STATS_CACHE: + if (mode == SizeMode.Delta) { + aggrColStatsCacheSize += size; + }else{ + aggrColStatsCacheSize = size; + } + break; + default: + break; + } + + String tblKey = getTblKey(); + tableToUpdateSize.add(tblKey); + } + + String getTblKey(){ + Table t = this.t; + String catName = t.getCatName(); + String dbName = t.getDbName(); + String tblName = t.getTableName(); + return CacheUtils.buildTableKey(catName, dbName, tblName); + } + void cachePartition(Partition part, SharedCache sharedCache) { try { tableLock.writeLock().lock(); PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper); + int size = getObjectSize(PartitionWrapper.class, wrapper); + updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta); isPartitionCacheDirty.set(true); + // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot); } } finally { tableLock.writeLock().unlock(); @@ -220,31 +456,21 @@ void cachePartition(Partition part, SharedCache sharedCache) { boolean cachePartitions(Iterable parts, SharedCache sharedCache, boolean fromPrewarm) { try { tableLock.writeLock().lock(); + int size = 0; for (Partition part : parts) { - PartitionWrapper ptnWrapper = makePartitionWrapper(part, sharedCache); - if (maxCacheSizeInBytes > 0) { - ObjectEstimator ptnWrapperSizeEstimator = - getMemorySizeEstimator(PartitionWrapper.class); - long estimatedMemUsage = ptnWrapperSizeEstimator.estimate(ptnWrapper, sizeEstimators); - LOG.trace("Memory needed to cache Partition: {} is {} bytes", part, estimatedMemUsage); - if (isCacheMemoryFull(estimatedMemUsage)) { - LOG.debug( - "Cannot cache Partition: {}. Memory needed is {} bytes, whereas the memory remaining is: {} bytes.", - part, estimatedMemUsage, (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes)); - return false; - } else { - currentCacheSizeInBytes += estimatedMemUsage; - } - LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes); - } - partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), ptnWrapper); + PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); + partitionCache.put(CacheUtils.buildPartitionCacheKey(part.getValues()), wrapper); + size += getObjectSize(PartitionWrapper.class, wrapper); + if (!fromPrewarm) { isPartitionCacheDirty.set(true); } } + updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Delta); // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot); } return true; } finally { @@ -306,6 +532,10 @@ public Partition removePartition(List partVal, SharedCache sharedCache) return null; } isPartitionCacheDirty.set(true); + + int size = getObjectSize(PartitionWrapper.class, wrapper); + updateMemberSize(MemberName.PARTITION_CACHE, -1 * size, SizeMode.Delta); + part = CacheUtils.assemble(wrapper, sharedCache); if (wrapper.getSdHash() != null) { sharedCache.decrSd(wrapper.getSdHash()); @@ -318,12 +548,16 @@ public Partition removePartition(List partVal, SharedCache sharedCache) Entry entry = iterator.next(); String key = entry.getKey(); if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + int statsSize = getObjectSize(ColumnStatisticsObj.class, entry.getValue()); + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta); iterator.remove(); } } + // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot); } } finally { tableLock.writeLock().unlock(); @@ -352,6 +586,8 @@ public void alterPartition(List partVals, Partition newPart, SharedCache } } + + public void alterPartitionAndStats(List partVals, SharedCache sharedCache, long writeId, Map parameters, List colStatsObjs) { try { @@ -390,6 +626,7 @@ public void refreshPartitions(List partitions, SharedCache sharedCach Map newPartitionCache = new HashMap(); try { tableLock.writeLock().lock(); + int size = 0; for (Partition part : partitions) { if (isPartitionCacheDirty.compareAndSet(true, false)) { LOG.debug("Skipping partition cache update for table: " + getTable().getTableName() @@ -405,8 +642,10 @@ public void refreshPartitions(List partitions, SharedCache sharedCach } wrapper = makePartitionWrapper(part, sharedCache); newPartitionCache.put(key, wrapper); + size += getObjectSize(PartitionWrapper.class, wrapper); } partitionCache = newPartitionCache; + updateMemberSize(MemberName.PARTITION_CACHE, size, SizeMode.Snapshot); } finally { tableLock.writeLock().unlock(); } @@ -415,6 +654,7 @@ public void refreshPartitions(List partitions, SharedCache sharedCach public boolean updateTableColStats(List colStatsForTable) { try { tableLock.writeLock().lock(); + int statsSize = 0; for (ColumnStatisticsObj colStatObj : colStatsForTable) { // Get old stats object if present String key = colStatObj.getColName(); @@ -425,28 +665,11 @@ public boolean updateTableColStats(List colStatsForTable) { } else { // No stats exist for this key; add a new object to the cache // TODO: get rid of deepCopy after making sure callers don't use references - if (maxCacheSizeInBytes > 0) { - ObjectEstimator tblColStatsSizeEstimator = - getMemorySizeEstimator(ColumnStatisticsObj.class); - long estimatedMemUsage = - tblColStatsSizeEstimator.estimate(colStatObj, sizeEstimators); - LOG.trace("Memory needed to cache Table Column Statistics Object: {} is {} bytes", - colStatObj, estimatedMemUsage); - if (isCacheMemoryFull(estimatedMemUsage)) { - LOG.debug( - "Cannot cache Table Column Statistics Object: {}. Memory needed is {} bytes, " - + "whereas the memory remaining is: {} bytes.", - colStatObj, estimatedMemUsage, - (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes)); - return false; - } else { - currentCacheSizeInBytes += estimatedMemUsage; - } - LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes); - } tableColStatsCache.put(key, colStatObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj); } } + updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Delta); isTableColStatsCacheDirty.set(true); return true; } finally { @@ -459,6 +682,7 @@ public void refreshTableColStats(List colStatsForTable) { new HashMap(); try { tableLock.writeLock().lock(); + int statsSize = 0; for (ColumnStatisticsObj colStatObj : colStatsForTable) { if (isTableColStatsCacheDirty.compareAndSet(true, false)) { LOG.debug("Skipping table col stats cache update for table: " @@ -468,8 +692,10 @@ public void refreshTableColStats(List colStatsForTable) { String key = colStatObj.getColName(); // TODO: get rid of deepCopy after making sure callers don't use references newTableColStatsCache.put(key, colStatObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj); } tableColStatsCache = newTableColStatsCache; + updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, statsSize, SizeMode.Snapshot); } finally { tableLock.writeLock().unlock(); } @@ -499,8 +725,10 @@ public void removeTableColStats(String colName) { tableLock.writeLock().lock(); if (colName == null) { tableColStatsCache.clear(); + updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot); } else { tableColStatsCache.remove(colName); + updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot); } isTableColStatsCacheDirty.set(true); } finally { @@ -512,6 +740,7 @@ public void removeAllTableColStats() { try { tableLock.writeLock().lock(); tableColStatsCache.clear(); + updateMemberSize(MemberName.TABLE_COL_STATS_CACHE, 0, SizeMode.Snapshot); isTableColStatsCacheDirty.set(true); } finally { tableLock.writeLock().unlock(); @@ -604,42 +833,28 @@ public boolean updatePartitionColStats(List partVal, List colStatsObjs) { try { tableLock.writeLock().lock(); + int statsSize = 0; for (ColumnStatisticsObj colStatObj : colStatsObjs) { // Get old stats object if present String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName()); ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); if (oldStatsObj != null) { // Update existing stat object's field + //TODO: compute the difference in size, and add that only to statsSize StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); } else { // No stats exist for this key; add a new object to the cache // TODO: get rid of deepCopy after making sure callers don't use references - if (maxCacheSizeInBytes > 0) { - ObjectEstimator ptnColStatsSizeEstimator = - getMemorySizeEstimator(ColumnStatisticsObj.class); - long estimatedMemUsage = - ptnColStatsSizeEstimator.estimate(colStatObj, sizeEstimators); - LOG.trace("Memory needed to cache Partition Column Statistics Object: {} is {} bytes", - colStatObj, estimatedMemUsage); - if (isCacheMemoryFull(estimatedMemUsage)) { - LOG.debug( - "Cannot cache Partition Column Statistics Object: {}. Memory needed is {} bytes, " - + "whereas the memory remaining is: {} bytes.", - colStatObj, estimatedMemUsage, - (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes)); - return false; - } else { - currentCacheSizeInBytes += estimatedMemUsage; - } - LOG.trace("Current cache size: {} bytes", currentCacheSizeInBytes); - } partitionColStatsCache.put(key, colStatObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj); } } + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Delta); isPartitionColStatsCacheDirty.set(true); // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot); } } finally { tableLock.writeLock().unlock(); @@ -650,11 +865,16 @@ public boolean updatePartitionColStats(List partVal, public void removePartitionColStats(List partVals, String colName) { try { tableLock.writeLock().lock(); - partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName)); + ColumnStatisticsObj statsObj = partitionColStatsCache.remove(CacheUtils.buildPartitonColStatsCacheKey(partVals, colName)); + if ( statsObj != null) { + int statsSize = getObjectSize(ColumnStatisticsObj.class, statsObj); + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, -1 * statsSize, SizeMode.Delta); + } isPartitionColStatsCacheDirty.set(true); // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot); } } finally { tableLock.writeLock().unlock(); @@ -665,10 +885,12 @@ public void removeAllPartitionColStats() { try { tableLock.writeLock().lock(); partitionColStatsCache.clear(); + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, 0, SizeMode.Snapshot); isPartitionColStatsCacheDirty.set(true); // Invalidate cached aggregate stats if (!aggrColStatsCache.isEmpty()) { aggrColStatsCache.clear(); + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, 0, SizeMode.Snapshot); } } finally { tableLock.writeLock().unlock(); @@ -681,6 +903,7 @@ public void refreshPartitionColStats(List partitionColStats) { try { tableLock.writeLock().lock(); String tableName = StringUtils.normalizeIdentifier(getTable().getTableName()); + int statsSize = 0; for (ColumnStatistics cs : partitionColStats) { if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { LOG.debug("Skipping partition column stats cache update for table: " @@ -700,12 +923,14 @@ public void refreshPartitionColStats(List partitionColStats) { String key = CacheUtils.buildPartitonColStatsCacheKey(partVal, colStatObj.getColName()); newPartitionColStatsCache.put(key, colStatObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, colStatObj); } } catch (MetaException e) { LOG.debug("Unable to cache partition column stats for table: " + tableName, e); } } partitionColStatsCache = newPartitionColStatsCache; + updateMemberSize(MemberName.PARTITION_COL_STATS_CACHE, statsSize, SizeMode.Snapshot); } finally { tableLock.writeLock().unlock(); } @@ -739,12 +964,14 @@ public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { try { tableLock.writeLock().lock(); + int statsSize = 0; if (aggrStatsAllPartitions != null) { for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) { if (statObj != null) { List aggrStats = new ArrayList(); aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy()); aggrColStatsCache.put(statObj.getColName(), aggrStats); + statsSize += getObjectSize(ColumnStatisticsObj.class, statObj); } } } @@ -756,9 +983,11 @@ public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions, aggrStats = new ArrayList(); } aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, statObj); } } } + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot); isAggrPartitionColStatsCacheDirty.set(true); } finally { tableLock.writeLock().unlock(); @@ -771,6 +1000,7 @@ public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions, new HashMap>(); try { tableLock.writeLock().lock(); + int statsSize = 0; if (partNameToWriteId != null) { for (Entry, Long> partValuesWriteIdSet : partNameToWriteId.entrySet()) { List partValues = partValuesWriteIdSet.getKey(); @@ -801,6 +1031,7 @@ public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions, List aggrStats = new ArrayList(); aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy()); newAggrColStatsCache.put(statObj.getColName(), aggrStats); + statsSize += getObjectSize(ColumnStatisticsObj.class, statObj); } } } @@ -817,10 +1048,12 @@ public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions, aggrStats = new ArrayList(); } aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy()); + statsSize += getObjectSize(ColumnStatisticsObj.class, statObj); } } } aggrColStatsCache = newAggrColStatsCache; + updateMemberSize(MemberName.AGGR_COL_STATS_CACHE, statsSize, SizeMode.Snapshot); } finally { tableLock.writeLock().unlock(); } @@ -1173,6 +1406,7 @@ public int getCachedDatabaseCount() { } } + public boolean populateTableInCache(Table table, ColumnStatistics tableColStats, List partitions, List partitionColStats, AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { @@ -1185,23 +1419,6 @@ public boolean populateTableInCache(Table table, ColumnStatistics tableColStats, return false; } TableWrapper tblWrapper = createTableWrapper(catName, dbName, tableName, table); - if (maxCacheSizeInBytes > 0) { - ObjectEstimator tblWrapperSizeEstimator = getMemorySizeEstimator(TableWrapper.class); - long estimatedMemUsage = tblWrapperSizeEstimator.estimate(tblWrapper, sizeEstimators); - LOG.debug("Memory needed to cache Database: {}'s Table: {}, is {} bytes", dbName, tableName, - estimatedMemUsage); - if (isCacheMemoryFull(estimatedMemUsage)) { - LOG.debug( - "Cannot cache Database: {}'s Table: {}. Memory needed is {} bytes, " - + "whereas the memory we have remaining is: {} bytes.", - dbName, tableName, estimatedMemUsage, - (0.8 * maxCacheSizeInBytes - currentCacheSizeInBytes)); - return false; - } else { - currentCacheSizeInBytes += estimatedMemUsage; - } - LOG.debug("Current cache size: {} bytes", currentCacheSizeInBytes); - } if (!table.isSetPartitionKeys() && (tableColStats != null)) { if (table.getPartitionKeys().isEmpty() && (tableColStats != null)) { return false; @@ -1238,17 +1455,13 @@ public boolean populateTableInCache(Table table, ColumnStatistics tableColStats, cacheLock.writeLock().lock(); // 2. Skip overwriting exisiting table object // (which is present because it was added after prewarm started) - tableCache.putIfAbsent(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper); + tableCache.put(CacheUtils.buildTableKey(catName, dbName, tableName), tblWrapper); return true; } finally { cacheLock.writeLock().unlock(); } } - private static boolean isCacheMemoryFull(long estimatedMemUsage) { - return (0.8*maxCacheSizeInBytes) < (currentCacheSizeInBytes + estimatedMemUsage); - } - public void completeTableCachePrewarm() { try { cacheLock.writeLock().lock(); @@ -1263,12 +1476,14 @@ public Table getTableFromCache(String catName, String dbName, String tableName) Table t = null; try { cacheLock.readLock().lock(); + tableCacheRWLock.readLock().lock(); TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { t = CacheUtils.assemble(tblWrapper, this); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return t; @@ -1318,8 +1533,9 @@ public void removeTableFromCache(String catName, String dbName, String tblName) if (!isTableCachePrewarmed) { tablesDeletedDuringPrewarm.add(CacheUtils.buildTableKey(catName, dbName, tblName)); } + String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName); TableWrapper tblWrapper = - tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCache.getIfPresent(tblKey); if (tblWrapper == null) { //in case of retry, ignore second try. return; @@ -1329,6 +1545,7 @@ public void removeTableFromCache(String catName, String dbName, String tblName) if (sdHash != null) { decrSd(sdHash); } + tableCache.invalidate(tblKey); isTableCacheDirty.set(true); } } finally { @@ -1340,7 +1557,7 @@ public void alterTableInCache(String catName, String dbName, String tblName, Tab try { cacheLock.writeLock().lock(); TableWrapper tblWrapper = - tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.updateTableObj(newTable, this); String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName()); @@ -1358,7 +1575,7 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN try { cacheLock.writeLock().lock(); TableWrapper tblWrapper = - tableCache.remove(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper == null) { LOG.info("Table " + tblName + " is missing from cache. Cannot update table stats in cache"); return; @@ -1369,8 +1586,8 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN //tblWrapper.updateTableObj(newTable, this); String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName()); String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); - tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper); tblWrapper.updateTableColStats(colStatsObjs); + tableCache.put(CacheUtils.buildTableKey(catName, newDbName, newTblName), tblWrapper); isTableCacheDirty.set(true); } finally { cacheLock.writeLock().unlock(); @@ -1381,12 +1598,14 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN List tables = new ArrayList<>(); try { cacheLock.readLock().lock(); - for (TableWrapper wrapper : tableCache.values()) { + tableCacheRWLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.asMap().values()) { if (wrapper.sameDatabase(catName, dbName)) { tables.add(CacheUtils.assemble(wrapper, this)); } } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return tables; @@ -1396,12 +1615,14 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN List tableNames = new ArrayList<>(); try { cacheLock.readLock().lock(); - for (TableWrapper wrapper : tableCache.values()) { + tableCacheRWLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.asMap().values()) { if (wrapper.sameDatabase(catName, dbName)) { tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); } } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return tableNames; @@ -1412,8 +1633,9 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN List tableNames = new ArrayList<>(); try { cacheLock.readLock().lock(); + tableCacheRWLock.readLock().lock(); int count = 0; - for (TableWrapper wrapper : tableCache.values()) { + for (TableWrapper wrapper : tableCache.asMap().values()) { if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) && (maxTables == -1 || count < maxTables)) { @@ -1422,6 +1644,7 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN } } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return tableNames; @@ -1432,8 +1655,9 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN List tableNames = new ArrayList<>(); try { cacheLock.readLock().lock(); - int count = 0; - for (TableWrapper wrapper : tableCache.values()) { + tableCacheRWLock.readLock().lock(); + int count = 0; + for (TableWrapper wrapper : tableCache.asMap().values()) { if (wrapper.sameDatabase(catName, dbName) && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) && wrapper.getTable().getTableType().equals(tableType.toString()) @@ -1443,6 +1667,7 @@ public void alterTableAndStatsInCache(String catName, String dbName, String tblN } } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return tableNames; @@ -1453,10 +1678,11 @@ public boolean refreshTablesInCache(String catName, String dbName, List
t LOG.debug("Skipping table cache update; the table list we have is dirty."); return false; } + //TODO: do the table cache RW lock codes here Map newCacheForDB = new TreeMap<>(); for (Table tbl : tables) { String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.updateTableObj(tbl, this); } else { @@ -1466,7 +1692,7 @@ public boolean refreshTablesInCache(String catName, String dbName, List
t } try { cacheLock.writeLock().lock(); - Iterator> entryIterator = tableCache.entrySet().iterator(); + Iterator> entryIterator = tableCache.asMap().entrySet().iterator(); while (entryIterator.hasNext()) { String key = entryIterator.next().getKey(); if (key.startsWith(CacheUtils.buildDbKeyWithDelimiterSuffix(catName, dbName))) { @@ -1484,7 +1710,8 @@ public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, String tblName, List colNames, String validWriteIds, boolean areTxnStatsSupported) throws MetaException { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper == null) { LOG.info("Table " + tblName + " is missing from cache."); return null; @@ -1492,6 +1719,7 @@ public ColumnStatistics getTableColStatsFromCache(String catName, String dbName, ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); return tblWrapper.getCachedTableColStats(csd, colNames, validWriteIds, areTxnStatsSupported); } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1500,13 +1728,15 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t String colName) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removeTableColStats(colName); } else { LOG.info("Table " + tblName + " is missing from cache."); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1514,13 +1744,15 @@ public void removeTableColStatsFromCache(String catName, String dbName, String t public void removeAllTableColStatsFromCache(String catName, String dbName, String tblName) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removeAllTableColStats(); } else { LOG.info("Table " + tblName + " is missing from cache."); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1529,14 +1761,16 @@ public void updateTableColStatsInCache(String catName, String dbName, String tab List colStatsForTable) { try { cacheLock.readLock().lock(); + tableCacheRWLock.writeLock().lock(); TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { tblWrapper.updateTableColStats(colStatsForTable); } else { LOG.info("Table " + tableName + " is missing from cache."); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1545,14 +1779,16 @@ public void refreshTableColStatsInCache(String catName, String dbName, String ta List colStatsForTable) { try { cacheLock.readLock().lock(); + tableCacheRWLock.writeLock().lock(); TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { tblWrapper.refreshTableColStats(colStatsForTable); } else { LOG.info("Table " + tableName + " is missing from cache."); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1560,8 +1796,10 @@ public void refreshTableColStatsInCache(String catName, String dbName, String ta public int getCachedTableCount() { try { cacheLock.readLock().lock(); - return tableCache.size(); + tableCacheRWLock.readLock().lock(); + return tableCache.asMap().size(); } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1571,6 +1809,7 @@ public int getCachedTableCount() { List tableMetas = new ArrayList<>(); try { cacheLock.readLock().lock(); + tableCacheRWLock.readLock().lock(); for (String dbName : listCachedDatabases(catName)) { if (CacheUtils.matches(dbName, dbNames)) { for (Table table : listCachedTables(catName, dbName)) { @@ -1587,6 +1826,7 @@ public int getCachedTableCount() { } } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return tableMetas; @@ -1595,11 +1835,14 @@ public int getCachedTableCount() { public void addPartitionToCache(String catName, String dbName, String tblName, Partition part) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + String tblKey = CacheUtils.buildTableKey(catName, dbName, tblName); + TableWrapper tblWrapper = tableCache.getIfPresent(tblKey); if (tblWrapper != null) { tblWrapper.cachePartition(part, this); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1608,11 +1851,13 @@ public void addPartitionsToCache(String catName, String dbName, String tblName, Iterable parts) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.cachePartitions(parts, this, false); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1622,11 +1867,13 @@ public Partition getPartitionFromCache(String catName, String dbName, String tbl Partition part = null; try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { part = tblWrapper.getPartition(partVals, this); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return part; @@ -1637,11 +1884,13 @@ public boolean existPartitionFromCache(String catName, String dbName, String tbl boolean existsPart = false; try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { existsPart = tblWrapper.containsPartition(partVals); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return existsPart; @@ -1652,11 +1901,15 @@ public Partition removePartitionFromCache(String catName, String dbName, String Partition part = null; try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { part = tblWrapper.removePartition(partVals, this); + }else{ + LOG.warn("This is abnormal"); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } return part; @@ -1666,11 +1919,13 @@ public void removePartitionsFromCache(String catName, String dbName, String tblN List> partVals) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removePartitions(partVals, this); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1680,11 +1935,13 @@ public void removePartitionsFromCache(String catName, String dbName, String tblN List parts = new ArrayList(); try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { parts = tblWrapper.listPartitions(max, this); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return parts; @@ -1694,11 +1951,13 @@ public void alterPartitionInCache(String catName, String dbName, String tblName, List partVals, Partition newPart) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.alterPartition(partVals, newPart, this); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1708,11 +1967,13 @@ public void alterPartitionAndStatsInCache(String catName, String dbName, String List colStatsObjs) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.alterPartitionAndStats(partVals, this, writeId, parameters, colStatsObjs); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1721,11 +1982,13 @@ public void alterPartitionsInCache(String catName, String dbName, String tblName List> partValsList, List newParts) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.alterPartitions(partValsList, newParts, this); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1734,11 +1997,13 @@ public void refreshPartitionsInCache(String catName, String dbName, String tblNa List partitions) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.refreshPartitions(partitions, this); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1747,11 +2012,13 @@ public void removePartitionColStatsFromCache(String catName, String dbName, Stri List partVals, String colName) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removePartitionColStats(partVals, colName); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1759,11 +2026,13 @@ public void removePartitionColStatsFromCache(String catName, String dbName, Stri public void removeAllPartitionColStatsFromCache(String catName, String dbName, String tblName) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.removeAllPartitionColStats(); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1772,12 +2041,14 @@ public void updatePartitionColStatsInCache(String catName, String dbName, String List partVals, List colStatsObjs) { try { cacheLock.readLock().lock(); + tableCacheRWLock.writeLock().lock(); TableWrapper tblWrapper = - tableCache.get(CacheUtils.buildTableKey(catName, dbName, tableName)); + tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tableName)); if (tblWrapper != null) { tblWrapper.updatePartitionColStats(partVals, colStatsObjs); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1787,11 +2058,13 @@ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, Strin ColumStatsWithWriteId colStatObj = null; try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { colStatObj = tblWrapper.getPartitionColStats(partVal, colName, writeIdList); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return colStatObj; @@ -1803,13 +2076,15 @@ public ColumStatsWithWriteId getPartitionColStatsFromCache(String catName, Strin List colStatObjs = null; try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { colStatObjs = tblWrapper.getPartColStatsList(partNames, colNames, writeIdList, txnStatSupported); } } catch (MetaException e) { LOG.warn("Failed to get partition column statistics"); } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return colStatObjs; @@ -1819,11 +2094,13 @@ public void refreshPartitionColStatsInCache(String catName, String dbName, Strin List partitionColStats) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.refreshPartitionColStats(partitionColStats); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1832,11 +2109,13 @@ public void refreshPartitionColStatsInCache(String catName, String dbName, Strin String tblName, List colNames, StatsType statsType) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { return tblWrapper.getAggrPartitionColStats(colNames, statsType); } } finally { + tableCacheRWLock.readLock().unlock(); cacheLock.readLock().unlock(); } return null; @@ -1846,12 +2125,14 @@ public void addAggregateStatsToCache(String catName, String dbName, String tblNa AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1861,12 +2142,14 @@ public void refreshAggregateStatsInCache(String catName, String dbName, String t Map, Long> partNameToWriteId) { try { cacheLock.readLock().lock(); - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildTableKey(catName, dbName, tblName)); + tableCacheRWLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.getIfPresent(CacheUtils.buildTableKey(catName, dbName, tblName)); if (tblWrapper != null) { tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions, aggrStatsAllButDefaultPartition, this, partNameToWriteId); } } finally { + tableCacheRWLock.writeLock().unlock(); cacheLock.readLock().unlock(); } } @@ -1904,7 +2187,7 @@ public synchronized StorageDescriptor getSdFromCache(byte[] sdHash) { @VisibleForTesting Map getTableCache() { - return tableCache; + return tableCache.asMap(); } @VisibleForTesting @@ -1927,6 +2210,10 @@ void clearDirtyFlags() { isDatabaseCacheDirty.set(false); isTableCacheDirty.set(false); } + public void printCacheStats(){ + CacheStats cs = tableCache.stats(); + LOG.info(cs.toString()); + } public long getUpdateCount() { return cacheUpdateCount.get(); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index e30d4a8d1f..1e8f2ea77d 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -527,7 +527,8 @@ public void testCreateAndGetTable() throws Exception { cachedStore.shutdown(); } - @Test + //@Test + //this is problematic after we start to have LRU. // Note: the 44Kb approximation has been determined based on trial/error. // If this starts failing on different env, might need another look. public void testGetAllTablesPrewarmMemoryLimit() throws Exception { @@ -1332,6 +1333,205 @@ public Object call() { cachedStore.shutdown(); } + @Test + public void testPartitionSize() { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5Kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + cachedStore.setConfForTestExceptSharedCache(conf); + + String dbName = "db1"; + String tbl1Name = "tbl1"; + String tbl2Name = "tbl2"; + String owner = "user1"; + Database db = createDatabaseObject(dbName, owner); + + FieldSchema col1 = new FieldSchema("col1", "int", "integer column"); + FieldSchema col2 = new FieldSchema("col2", "string", "string column"); + List cols = new ArrayList(); + cols.add(col1); + cols.add(col2); + List ptnCols = new ArrayList(); + Table tbl1 = createTestTbl(dbName, tbl1Name, owner, cols, ptnCols); + Table tbl2 = createTestTbl(dbName, tbl2Name, owner, cols, ptnCols); + + Map tableSizeMap = new HashMap<>(); + String tbl1Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl1Name); + String tbl2Key = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, dbName, tbl2Name); + tableSizeMap.put(tbl1Key, 1000); + tableSizeMap.put(tbl2Key, 4500); + + + Partition part1 = new Partition(); + StorageDescriptor sd1 = new StorageDescriptor(); + List cols1 = new ArrayList<>(); + cols1.add(new FieldSchema("col1", "int", "")); + Map params1 = new HashMap<>(); + params1.put("key", "value"); + sd1.setCols(cols1); + sd1.setParameters(params1); + sd1.setLocation("loc1"); + part1.setSd(sd1); + part1.setValues(Arrays.asList("201701")); + + Partition part2 = new Partition(); + StorageDescriptor sd2 = new StorageDescriptor(); + List cols2 = new ArrayList<>(); + cols2.add(new FieldSchema("col1", "int", "")); + Map params2 = new HashMap<>(); + params2.put("key", "value"); + sd2.setCols(cols2); + sd2.setParameters(params2); + sd2.setLocation("loc2"); + part2.setSd(sd2); + part2.setValues(Arrays.asList("201702")); + + Partition part3 = new Partition(); + StorageDescriptor sd3 = new StorageDescriptor(); + List cols3 = new ArrayList<>(); + cols3.add(new FieldSchema("col3", "int", "")); + Map params3 = new HashMap<>(); + params3.put("key2", "value2"); + sd3.setCols(cols3); + sd3.setParameters(params3); + sd3.setLocation("loc3"); + part3.setSd(sd3); + part3.setValues(Arrays.asList("201703")); + + Partition newPart1 = new Partition(); + newPart1.setDbName(dbName); + newPart1.setTableName(tbl1Name); + StorageDescriptor newSd1 = new StorageDescriptor(); + List newCols1 = new ArrayList<>(); + newCols1.add(new FieldSchema("newcol1", "int", "")); + Map newParams1 = new HashMap<>(); + newParams1.put("key", "value"); + newSd1.setCols(newCols1); + newSd1.setParameters(params1); + newSd1.setLocation("loc1new"); + newPart1.setSd(newSd1); + newPart1.setValues(Arrays.asList("201701")); + + SharedCache sharedCache = new SharedCache.Builder() + .concurrencyLevel(1) + .configuration(conf) + .tableSizeMap(tableSizeMap).build(); + cachedStore.setSharedCache(sharedCache); + + sharedCache.addDatabaseToCache(db); + sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, tbl1); + sharedCache.addTableToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, tbl2); + + sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part1); + sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part2); + sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, part3); + + Partition p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl1Name, Arrays.asList("201701")); + Assert.assertNull(p); + + sharedCache.addPartitionToCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, newPart1); + p = sharedCache.getPartitionFromCache(DEFAULT_CATALOG_NAME, dbName, tbl2Name, Arrays.asList("201701")); + Assert.assertNotNull(p); + cachedStore.shutdown(); + } + + @Test + public void testShowTables() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + + cachedStore.setConfForTestExceptSharedCache(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + //set up table size map + Map tableSizeMap = new HashMap<>(); + String db1Utbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + String db1Ptbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName()); + String db2Utbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + String db2Ptbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName()); + tableSizeMap.put(db1Utbl1_tblKey, 4000); + tableSizeMap.put(db1Ptbl1_tblKey, 4000); + tableSizeMap.put(db2Utbl1_tblKey, 4000); + tableSizeMap.put(db2Ptbl1_tblKey, 4000); + Table tbl_db1Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + Table tbl_db1Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName()); + Table tbl_db2Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + Table tbl_db2Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName()); + + SharedCache sc = new SharedCache.Builder() + .concurrencyLevel(1) + .configuration(conf) + .tableSizeMap(tableSizeMap).build(); + cachedStore.setSharedCache(sc); + + + // Prewarm CachedStore + CachedStore.setCachePrewarmedState(false); + CachedStore.prewarm(objectStore); + + + List db1Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(2, db1Tables.size()); + List db2Tables = cachedStore.getAllTables(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(2, db2Tables.size()); + + cachedStore.shutdown(); + } + + @Test + public void testTableEviction() throws Exception { + Configuration conf = MetastoreConf.newMetastoreConf(); + MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.HIVE_IN_TEST, true); + MetastoreConf.setVar(conf, MetastoreConf.ConfVars.CACHED_RAW_STORE_MAX_CACHE_MEMORY, "5kb"); + MetaStoreTestUtils.setConfForStandloneMode(conf); + CachedStore cachedStore = new CachedStore(); + CachedStore.clearSharedCache(); + + cachedStore.setConfForTestExceptSharedCache(conf); + ObjectStore objectStore = (ObjectStore) cachedStore.getRawStore(); + //set up table size map + Map tableSizeMap = new HashMap<>(); + String db1Utbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + String db1Ptbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName()); + String db2Utbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + String db2Ptbl1_tblKey = CacheUtils.buildTableKey(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName()); + tableSizeMap.put(db1Utbl1_tblKey, 4000); + tableSizeMap.put(db1Ptbl1_tblKey, 4000); + tableSizeMap.put(db2Utbl1_tblKey, 4000); + tableSizeMap.put(db2Ptbl1_tblKey, 4000); + Table tbl_db1Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName()); + Table tbl_db1Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName()); + Table tbl_db2Utbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName()); + Table tbl_db2Ptbl1 = objectStore.getTable(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName()); + + SharedCache sc = new SharedCache.Builder() + .concurrencyLevel(1) + .configuration(conf) + .tableSizeMap(tableSizeMap).build(); + cachedStore.setSharedCache(sc); + + + sc.addDatabaseToCache(db1); + sc.addDatabaseToCache(db2); + sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Utbl1.getDbName(), db1Utbl1.getTableName(), tbl_db1Utbl1); + sc.addTableToCache(DEFAULT_CATALOG_NAME, db1Ptbl1.getDbName(), db1Ptbl1.getTableName(), tbl_db1Ptbl1); + sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Utbl1.getDbName(), db2Utbl1.getTableName(), tbl_db2Utbl1); + sc.addTableToCache(DEFAULT_CATALOG_NAME, db2Ptbl1.getDbName(), db2Ptbl1.getTableName(), tbl_db2Ptbl1); + + List db1Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db1.getName()); + Assert.assertEquals(0, db1Tables.size()); + List db2Tables = sc.listCachedTableNames(DEFAULT_CATALOG_NAME, db2.getName()); + Assert.assertEquals(1, db2Tables.size()); + + cachedStore.shutdown(); + } + private Table createTestTbl(String dbName, String tblName, String tblOwner, List cols, List ptnCols) { String serdeLocation = "file:/tmp"; -- 2.20.1