diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index b3d99a1da5..12ad7bd4bc 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -7886,6 +7886,13 @@ protected ColumnStatistics getJdoResult( } @Override + public List getPartitionColumnStatisticsForTable(String dbName, + String tblName) { + // TODO Auto-generated method stub + return null; + } + + @Override public List getPartitionColumnStatistics(String dbName, String tableName, List partNames, List colNames) throws MetaException, NoSuchObjectException { return getPartitionColumnStatisticsInternal( @@ -7971,35 +7978,6 @@ protected String describeResult() { } @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - final boolean enableBitVector = - MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR); - return new GetHelper>(dbName, null, true, false) { - @Override - protected List getSqlResult( - GetHelper> ctx) throws MetaException { - return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector); - } - - @Override - protected List getJdoResult( - GetHelper> ctx) - throws MetaException, NoSuchObjectException { - // This is fast path for query optimizations, if we can find this info - // quickly using directSql, do it. No point in failing back to slow path - // here. - throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase."); - } - - @Override - protected String describeResult() { - return null; - } - }.run(true); - } - - @Override public void flushCache() { // NOP as there's no caching } @@ -9927,7 +9905,7 @@ public WMFullResourcePlan alterResourcePlan(String name, WMNullableResourcePlan } else { result = handleSimpleAlter(name, changes, canActivateDisabled, canDeactivate); } - + commited = commitTransaction(); return result; } catch (Exception e) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index e4e7d4239d..4816613555 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -444,6 +444,14 @@ ColumnStatistics getTableColumnStatistics(String dbName, String tableName, throws MetaException, NoSuchObjectException; /** + * + * @param dbName + * @param tblName + * @return Column Stats for all partitions of a table + */ + List getPartitionColumnStatisticsForTable(String dbName, String tblName); + + /** * Deletes column statistics if present associated with a given db, table, partition and col. If * null is passed instead of a colName, stats when present for all columns associated * with a given db, table and partition are deleted. @@ -602,17 +610,6 @@ AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** - * Get column stats for all partitions of all tables in the database - * - * @param dbName - * @return List of column stats objects for all partitions of all tables in the database - * @throws MetaException - * @throws NoSuchObjectException - */ - List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException; - - /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. * @return list of notifications, sorted by eventId diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index f0f650ddcf..8c9fb285e5 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.cache.SharedCache.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.SharedCache.TableWrapper; import org.apache.hadoop.hive.metastore.utils.StringUtils; public class CacheUtils { @@ -59,6 +59,19 @@ public static String buildKey(String dbName, String tableName, List part return key; } + public static String buildKey(List partVals) { + String key = ""; + if (CollectionUtils.isNotEmpty(partVals)) { + key += String.join(delimit, partVals); + } + return key; + } + + public static String buildKey(List partVals, String colName) { + String key = buildKey(partVals); + return key + delimit + colName; + } + public static String buildKeyWithDelimit(String dbName, String tableName, List partVals) { return buildKey(dbName, tableName, partVals) + delimit; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 80aa3bcdb4..4f629a2757 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -31,8 +31,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -90,7 +88,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; @@ -118,28 +115,12 @@ // TODO constraintCache // TODO need sd nested copy? // TODO String intern -// TODO restructure HBaseStore // TODO monitor event queue // TODO initial load slow? // TODO size estimation -// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation public class CachedStore implements RawStore, Configurable { private static ScheduledExecutorService cacheUpdateMaster = null; - private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( - true); - private static ReentrantReadWriteLock partitionAggrColStatsCacheLock = - new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false); - private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); private static List whitelistPatterns = null; private static List blacklistPatterns = null; private RawStore rawStore = null; @@ -154,71 +135,6 @@ static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); - static class TableWrapper { - Table t; - String location; - Map parameters; - byte[] sdHash; - TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { - this.t = t; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Table getTable() { - return t; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map getParameters() { - return parameters; - } - } - - static class PartitionWrapper { - Partition p; - String location; - Map parameters; - byte[] sdHash; - PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { - this.p = p; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Partition getPartition() { - return p; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map getParameters() { - return parameters; - } - } - - static class StorageDescriptorWrapper { - StorageDescriptor sd; - int refCount = 0; - StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { - this.sd = sd; - this.refCount = refCount; - } - public StorageDescriptor getSd() { - return sd; - } - public int getRefCount() { - return refCount; - } - } - public CachedStore() { } @@ -270,17 +186,9 @@ static void prewarm(RawStore rawStore) throws Exception { SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); for (int i = 0; i < dbNames.size(); i++) { String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); - // Cache partition column stats - Deadline.startTimer("getColStatsForDatabase"); - List colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - sharedCache.addPartitionColStatsToCache(colStatsForDB); - } LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); Database db = rawStore.getDatabase(dbName); - sharedCache.addDatabaseToCache(dbName, db); + sharedCache.addDatabaseToCache(db); List tblNames = rawStore.getAllTables(dbName); LOG.debug("Tables in database: {} : {}", dbName, tblNames); for (int j = 0; j < tblNames.size(); j++) { @@ -293,51 +201,53 @@ static void prewarm(RawStore rawStore) throws Exception { tblName, j, tblNames.size()); Table table = null; table = rawStore.getTable(dbName, tblName); + ColumnStatistics tableColStats = null; + List partitions = null; + List partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; // It is possible the table is deleted during fetching tables of the database, // in that case, continue with the next table if (table == null) { continue; } - sharedCache.addTableToCache(dbName, tblName, table); - if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - for (Partition partition : partitions) { - sharedCache.addPartitionToCache(dbName, tblName, partition); - } - } - // Cache table column stats + // Get table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); + tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); - if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); - } - // Cache aggregate stats for all partitions of a table and for all but default partition - List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - if ((partNames != null) && (partNames.size() > 0)) { - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - // Remove default partition from partition names and get aggregate - // stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); + if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = rawStore.getPartitionColumnStatisticsForTable(dbName, tblName); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default partition + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + if ((partNames != null) && (partNames.size() > 0)) { + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); } + sharedCache.updateTableCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); } } // Notify all blocked threads that prewarm is complete now @@ -439,235 +349,100 @@ public void run() { isFirstRun = false; } } else { - // TODO: prewarm and update can probably be merged. update(); } } + // This method of update will be changed with HIVE-18661, which will replace this altogether + // Please ignore the code duplication with prewarm for now. public void update() { Deadline.registerIfNot(1000000); LOG.debug("CachedStore: updating cached objects"); - try { - List dbNames = rawStore.getAllDatabases(); - if (dbNames != null) { - // Update the database in cache - updateDatabases(rawStore, dbNames); + if (sharedCacheWrapper.isInitialized()) { + try { + Deadline.registerIfNot(1000000); + List dbNames = rawStore.getAllDatabases(); + LOG.info("Number of databases: " + dbNames.size()); + SharedCache sharedCache = sharedCacheWrapper.get(); + List databases = new ArrayList<>(); for (String dbName : dbNames) { - updateDatabasePartitionColStats(rawStore, dbName); - // Update the tables in cache - updateTables(rawStore, dbName); - List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); - for (String tblName : tblNames) { + Database db; + try { + db = rawStore.getDatabase(dbName); + databases.add(db); + } catch (NoSuchObjectException e) { + LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); + } + } + sharedCache.refreshDatabasesInCache(databases); + for (int i = 0; i < dbNames.size(); i++) { + String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); + LOG.info("Updating cached database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); + Database db = rawStore.getDatabase(dbName); + sharedCache.addDatabaseToCache(db); + List tblNames = rawStore.getAllTables(dbName); + LOG.debug("Tables in database: {} : {}", dbName, tblNames); + for (int j = 0; j < tblNames.size(); j++) { + String tblName = StringUtils.normalizeIdentifier(tblNames.get(j)); if (!shouldCacheTable(dbName, tblName)) { + LOG.info("Not caching database: {}'s table: {}", dbName, tblName); continue; } - // Update the partitions for a table in cache - updateTablePartitions(rawStore, dbName, tblName); - // Update the table column stats for a table in cache - updateTableColStats(rawStore, dbName, tblName); - // Update aggregate column stats cache - updateAggregateStatsCache(rawStore, dbName, tblName); - } - } - } - } catch (Exception e) { - LOG.error("Updating CachedStore: error happen when refresh; ignoring", e); - } - } - - private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) { - try { - Deadline.startTimer("getColStatsForDatabasePartitions"); - List colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - if (partitionColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update; the partition column stats " - + "list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe() - .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB); - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}", - dbName, e); - } finally { - if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionColStatsCacheLock.writeLock().unlock(); - } - } - } - - // Update cached aggregate stats for all partitions of a table and for all - // but default partition - private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) { - try { - Table table = rawStore.getTable(dbName, tblName); - List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - if ((partNames != null) && (partNames.size() > 0)) { - Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); - } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) { - if (partitionAggrColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug( - "Skipping aggregate column stats cache update; the aggregate column stats we " - + "have is dirty."); - return; + LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, j, tblNames.size()); + Table table = null; + table = rawStore.getTable(dbName, tblName); + ColumnStatistics tableColStats = null; + List partitions = null; + List partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; + // It is possible the table is deleted during fetching tables of the database, + // in that case, continue with the next table + if (table == null) { + continue; } - sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), + // Get table column stats + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = rawStore.getPartitionColumnStatisticsForTable(dbName, tblName); + Deadline.stopTimer(); + // Get aggregate stats for all partitions of a table and for all but default partition + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + if ((partNames != null) && (partNames.size() > 0)) { + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } + } + sharedCache.refreshTableCache(table, tableColStats, partitions, partitionColStats, aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); } } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, - e); - } finally { - if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionAggrColStatsCacheLock.writeLock().unlock(); - } - } - } - - private void updateDatabases(RawStore rawStore, List dbNames) { - // Prepare the list of databases - List databases = new ArrayList<>(); - for (String dbName : dbNames) { - Database db; - try { - db = rawStore.getDatabase(dbName); - databases.add(db); - } catch (NoSuchObjectException e) { - LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); - } - } - // Update the cached database objects - try { - if (databaseCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshDatabases(databases); - } - } finally { - if (databaseCacheLock.isWriteLockedByCurrentThread()) { - databaseCacheLock.writeLock().unlock(); - } - } - } - - // Update the cached table objects - private void updateTables(RawStore rawStore, String dbName) { - List tables = new ArrayList<>(); - try { - List tblNames = rawStore.getAllTables(dbName); - for (String tblName : tblNames) { - if (!shouldCacheTable(dbName, tblName)) { - continue; - } - Table table = - rawStore.getTable(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - tables.add(table); - } - if (tableCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables); - } - } catch (MetaException e) { - LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); - } finally { - if (tableCacheLock.isWriteLockedByCurrentThread()) { - tableCacheLock.writeLock().unlock(); - } - } - } - - // Update the cached partition objects for a table - private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) { - try { - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - if (partitionCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition cache update; the partition list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshPartitions( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partitions); - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } finally { - if (partitionCacheLock.isWriteLockedByCurrentThread()) { - partitionCacheLock.writeLock().unlock(); - } - } - } - - // Update the cached col stats for this table - private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { - try { - Table table = rawStore.getTable(dbName, tblName); - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); - Deadline.stopTimer(); - if (tableColStats != null) { - if (tableColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table column stats cache update; the table column stats list we " - + "have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTableColStats( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e); - } finally { - if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) { - tableColStatsCacheLock.writeLock().unlock(); + } catch (Exception e) { + LOG.error("Updating CachedStore: error happen when refresh; ignoring", e); } } } @@ -707,26 +482,18 @@ public void rollbackTransaction() { public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()), - db.deepCopy()); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return; } + sharedCache.addDatabaseToCache(db); } @Override public Database getDatabase(String dbName) throws NoSuchObjectException { SharedCache sharedCache; - if (!sharedCacheWrapper.isInitialized()) { return rawStore.getDatabase(dbName); } - try { sharedCache = sharedCacheWrapper.get(); } catch (MetaException e) { @@ -744,34 +511,24 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc boolean succ = rawStore.dropDatabase(dbname); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname)); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname)); } return succ; } @Override - public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException, - MetaException { + public boolean alterDatabase(String dbName, Database db) + throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); } return succ; } @@ -782,14 +539,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce return rawStore.getDatabases(pattern); } SharedCache sharedCache = sharedCacheWrapper.get(); - List results = new ArrayList<>(); - for (String dbName : sharedCache.listCachedDatabases()) { - dbName = StringUtils.normalizeIdentifier(dbName); - if (CacheUtils.matches(dbName, pattern)) { - results.add(dbName); - } - } - return results; + return sharedCache.listCachedDatabases(pattern); } @Override @@ -843,21 +593,16 @@ public void createTable(Table tbl) throws InvalidObjectException, MetaException return; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - validateTableType(tbl); - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.addTableToCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); + if (sharedCache == null) { + return; } + validateTableType(tbl); + sharedCache.addTableToCache(dbName, tblName, tbl); } @Override - public boolean dropTable(String dbName, String tblName) throws MetaException, - NoSuchObjectException, InvalidObjectException, InvalidInputException { + public boolean dropTable(String dbName, String tblName) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { dbName = StringUtils.normalizeIdentifier(dbName); @@ -866,25 +611,10 @@ public boolean dropTable(String dbName, String tblName) throws MetaException, return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - // Remove table - try { - // Wait if background table cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove table col stats - try { - // Wait if background table col stats cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName); - } finally { - tableColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removeTableFromCache(dbName, tblName); } return succ; } @@ -915,24 +645,10 @@ public boolean addPartition(Partition part) throws InvalidObjectException, MetaE return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(dbName, tblName, part); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.addPartitionToCache(dbName, tblName, part); } return succ; } @@ -948,26 +664,10 @@ public boolean addPartitions(String dbName, String tblName, List part return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (Partition part : parts) { - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.addPartitionsToCache(dbName, tblName, parts); } return succ; } @@ -983,27 +683,13 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + sharedCache.addPartitionToCache(dbName, tblName, part); } } return succ; @@ -1014,13 +700,11 @@ public Partition getPartition(String dbName, String tblName, List part_v throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { return rawStore.getPartition(dbName, tblName, part_vals); } SharedCache sharedCache = sharedCacheWrapper.get(); - Partition part = - sharedCache.getPartitionFromCache(dbName, tblName, part_vals); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part == null) { // TODO Manage privileges throw new NoSuchObjectException("partition values=" + part_vals.toString()); @@ -1051,39 +735,35 @@ public boolean dropPartition(String dbName, String tblName, List part_va return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - // Remove partition - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(dbName, tblName, part_vals); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } return succ; } @Override + public void dropPartitions(String dbName, String tblName, List partNames) + throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } + SharedCache sharedCache = sharedCacheWrapper.get(); + if (sharedCache == null) { + return; + } + List> partVals = new ArrayList>(); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + sharedCache.removePartitionsFromCache(dbName, tblName, partVals); + } + + @Override public List getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); @@ -1107,70 +787,18 @@ public void alterTable(String dbName, String tblName, Table newTable) return; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - - if (shouldCacheTable(dbName, newTblName)) { - validateTableType(newTable); - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); - } - } else { - // Remove the table and its cached partitions, stats etc, - // since it does not pass the whitelist/blacklist filter. - // Remove table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionsFromCache(dbName, tblName); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Update aggregate partition col stats keys wherever applicable - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + if (sharedCache == null) { + return; + } + if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache and the new table can also be cached + sharedCache.alterTableInCache(dbName, tblName, newTable); + } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is *not* in the cache but the new table can be cached + sharedCache.addTableToCache(dbName, newTblName, newTable); + } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache but the new table *cannot* be cached + sharedCache.removeTableFromCache(dbName, tblName); } } @@ -1180,13 +808,8 @@ public void alterTable(String dbName, String tblName, Table newTable) return rawStore.getTables(dbName, pattern); } SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern)) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + (short) -1); } @Override @@ -1196,14 +819,8 @@ public void alterTable(String dbName, String tblName, Table newTable) return rawStore.getTables(dbName, pattern, tableType); } SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern) && - table.getTableType().equals(tableType.toString())) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + tableType); } @Override @@ -1258,15 +875,7 @@ public void alterTable(String dbName, String tblName, Table newTable) return rawStore.getAllTables(dbName); } SharedCache sharedCache = sharedCacheWrapper.get(); - return getAllTablesInternal(dbName, sharedCache); - } - - private static List getAllTablesInternal(String dbName, SharedCache sharedCache) { - List tblNames = new ArrayList<>(); - for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName())); - } - return tblNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName)); } @Override @@ -1276,16 +885,8 @@ public void alterTable(String dbName, String tblName, Table newTable) return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - int count = 0; - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; - } - } - return tableNames; + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter, + max_tables); } @Override @@ -1332,34 +933,10 @@ public void alterPartition(String dbName, String tblName, List partVals, return; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return; } + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } @Override @@ -1372,42 +949,10 @@ public void alterPartitions(String dbName, String tblName, List> pa return; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return; } + sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts); } @Override @@ -1448,18 +993,18 @@ public void alterIndex(String dbname, String baseTblName, String name, private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, String defaultPartName, short maxParts, List result, SharedCache sharedCache) - throws MetaException, NoSuchObjectException { - List parts = sharedCache.listCachedPartitions( - StringUtils.normalizeIdentifier(table.getDbName()), - StringUtils.normalizeIdentifier(table.getTableName()), maxParts); + throws MetaException, NoSuchObjectException { + List parts = + sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()), + StringUtils.normalizeIdentifier(table.getTableName()), maxParts); for (Partition part : parts) { result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); } if (defaultPartName == null || defaultPartName.isEmpty()) { defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); } - return expressionProxy.filterPartitionsByExpr( - table.getPartitionKeys(), expr, defaultPartName, result); + return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, + result); } @Override @@ -1824,7 +1369,9 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; + if (sharedCache == null) { + return succ; + } List statsObjs = colStats.getStatsObj(); Table tbl = getTable(dbName, tblName); List colNames = new ArrayList<>(); @@ -1832,24 +1379,8 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update table col stats - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); - } finally { - tableColStatsCacheLock.readLock().unlock(); - } + sharedCache.alterTableInCache(dbName, tblName, tbl); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } return succ; } @@ -1864,14 +1395,8 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tblName, } SharedCache sharedCache = sharedCacheWrapper.get(); ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); - List colStatObjs = new ArrayList<>(); - for (String colName : colNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); - ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); - if (colStat != null) { - colStatObjs.add(colStat); - } - } + List colStatObjs = + sharedCache.getTableColStatsFromCache(dbName, tblName, colNames); if (colStatObjs.isEmpty()) { return null; } else { @@ -1890,15 +1415,10 @@ public boolean deleteTableColumnStatistics(String dbName, String tblName, String return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); - } finally { - tableColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } return succ; } @@ -1914,7 +1434,9 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsObjs = colStats.getStatsObj(); Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); @@ -1922,34 +1444,8 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List getPartitionColumnStatisticsForTable(String dbName, + String tblName) { + return rawStore.getPartitionColumnStatisticsForTable(dbName, tblName); + } + + @Override public boolean deletePartitionColumnStatistics(String dbName, String tblName, String partName, List partVals, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { @@ -1974,24 +1477,10 @@ public boolean deletePartitionColumnStatistics(String dbName, String tblName, St return succ; } SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } return succ; } @@ -2044,10 +1533,8 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions(String dbName, List colStatsWithPartInfoList = new ArrayList(); for (String partName : partNames) { - String colStatsCacheKey = - CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); ColumnStatisticsObj colStatsForPart = - sharedCache.getCachedPartitionColStats(colStatsCacheKey); + sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName); if (colStatsForPart != null) { ColStatsObjWithSourceInfo colStatsWithPartInfo = new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName); @@ -2163,52 +1650,6 @@ public void setMetaStoreSchemaVersion(String version, String comment) } @Override - public void dropPartitions(String dbName, String tblName, List partNames) - throws MetaException, NoSuchObjectException { - rawStore.dropPartitions(dbName, tblName, partNames); - dbName = StringUtils.normalizeIdentifier(dbName); - tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(dbName, tblName)) { - return; - } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (String partName : partNames) { - List vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(dbName, tblName, vals); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (String partName : partNames) { - List part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } - } - - @Override public List listPrincipalDBGrantsAll( String principalName, PrincipalType principalType) { return rawStore.listPrincipalDBGrantsAll(principalName, principalType); @@ -2470,12 +1911,6 @@ public void dropConstraint(String dbName, String tableName, return rawStore.addNotNullConstraints(nns); } - @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - return rawStore.getPartitionColStatsForDatabase(dbName); - } - public RawStore getRawStore() { return rawStore; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 32ea17495f..eb69a9ab0e 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -25,39 +25,29 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.TreeMap; - +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.metastore.StatObjectConverter; -import org.apache.hadoop.hive.metastore.Warehouse; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; -import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; public class SharedCache { - private Map databaseCache = new TreeMap<>(); - private Map tableCache = new TreeMap<>(); - private Map partitionCache = new TreeMap<>(); - private Map partitionColStatsCache = new TreeMap<>(); - private Map tableColStatsCache = new TreeMap<>(); + private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true); + private Map databaseCache = new ConcurrentHashMap(); + private Map tableCache = new ConcurrentHashMap(); private Map sdCache = new HashMap<>(); - private Map> aggrColStatsCache = - new HashMap>(); private static MessageDigest md; static enum StatsType { @@ -74,8 +64,6 @@ public int getPosition() { } } - private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); - static { try { md = MessageDigest.getInstance("MD5"); @@ -84,534 +72,959 @@ public int getPosition() { } } - public synchronized Database getDatabaseFromCache(String name) { - return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; - } + static class TableWrapper { + Table t; + String location; + Map parameters; + byte[] sdHash; + ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true); + // For caching column stats for an unpartitioned table + // Key is column name and the value is the col stat object + private Map tableColStatsCache = + new HashMap(); + // For caching partition objects + // Ket is partition values and the value is a wrapper around the partition object + private Map partitionCache = new HashMap(); + // For caching column stats for a partitioned table + // Key is aggregate of partition values, column name and the value is the col stat object + private Map partitionColStatsCache = + new HashMap(); + // For caching aggregate column stats for all and all minus default partition + // Key is column name and the value is a list of 2 col stat objects + // (all partitions and all but default) + private Map> aggrColStatsCache = + new HashMap>(); + + TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { + this.t = t; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } - public synchronized void addDatabaseToCache(String dbName, Database db) { - Database dbCopy = db.deepCopy(); - dbCopy.setName(StringUtils.normalizeIdentifier(dbName)); - databaseCache.put(dbName, dbCopy); - } + public Table getTable() { + return t; + } - public synchronized void removeDatabaseFromCache(String dbName) { - databaseCache.remove(dbName); - } + public void setTable(Table t) { + this.t = t; + } - public synchronized List listCachedDatabases() { - return new ArrayList<>(databaseCache.keySet()); - } + public byte[] getSdHash() { + return sdHash; + } - public synchronized void alterDatabaseInCache(String dbName, Database newDb) { - removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); - addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); - } + public void setSdHash(byte[] sdHash) { + this.sdHash = sdHash; + } - public synchronized int getCachedDatabaseCount() { - return databaseCache.size(); - } + public String getLocation() { + return location; + } - public synchronized Table getTableFromCache(String dbName, String tableName) { - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); - if (tblWrapper == null) { - return null; + public void setLocation(String location) { + this.location = location; } - Table t = CacheUtils.assemble(tblWrapper, this); - return t; - } - public synchronized void addTableToCache(String dbName, String tblName, Table tbl) { - Table tblCopy = tbl.deepCopy(); - tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName)); - tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName)); - if (tblCopy.getPartitionKeys() != null) { - for (FieldSchema fs : tblCopy.getPartitionKeys()) { - fs.setName(StringUtils.normalizeIdentifier(fs.getName())); + public Map getParameters() { + return parameters; + } + + + public void setParameters(Map parameters) { + this.parameters = parameters; + } + + void cachePartition(Partition part, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); + partitionCache.put(CacheUtils.buildKey(part.getValues()), wrapper); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); } } - TableWrapper wrapper; - if (tbl.getSd() != null) { - byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md); - StorageDescriptor sd = tbl.getSd(); - increSd(sd, sdHash); - tblCopy.setSd(null); - wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); - } else { - wrapper = new TableWrapper(tblCopy, null, null, null); + + void cachePartitions(List parts, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (Partition part : parts) { + PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); + partitionCache.put(CacheUtils.buildKey(part.getValues()), wrapper); + } + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public Partition getPartition(List partVals, SharedCache sharedCache) { + Partition part = null; + try { + tableLock.readLock().lock(); + PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildKey(partVals)); + if (wrapper == null) { + return null; + } + part = CacheUtils.assemble(wrapper, sharedCache); + } finally { + tableLock.readLock().unlock(); + } + return part; + } + + public List listPartitions(int max, SharedCache sharedCache) { + List parts = new ArrayList<>(); + int count = 0; + try { + tableLock.readLock().lock(); + for (PartitionWrapper wrapper : partitionCache.values()) { + if (max == -1 || count < max) { + parts.add(CacheUtils.assemble(wrapper, sharedCache)); + count++; + } + } + } finally { + tableLock.readLock().unlock(); + } + return parts; + } + + public boolean containsPartition(List partVals) { + boolean containsPart = false; + try { + tableLock.readLock().lock(); + containsPart = partitionCache.containsKey(CacheUtils.buildKey(partVals)); + } finally { + tableLock.readLock().unlock(); + } + return containsPart; + } + + public Partition removePartition(List partVal, SharedCache sharedCache) { + Partition part = null; + try { + tableLock.writeLock().lock(); + PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(partVal)); + if (wrapper.getSdHash() != null) { + sharedCache.decrSd(wrapper.getSdHash()); + } + part = CacheUtils.assemble(wrapper, sharedCache); + // Remove col stats + String partialKey = CacheUtils.buildKey(partVal); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + return part; + } + + public void removePartitions(List> partVals, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (List partVal : partVals) { + removePartition(partVal, sharedCache); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void alterPartition(List partVals, Partition newPart, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + removePartition(partVals, sharedCache); + cachePartition(newPart, sharedCache); + } finally { + tableLock.writeLock().unlock(); + } + } + + public void alterPartitions(List> partValsList, List newParts, + SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (int i = 0; i < partValsList.size(); i++) { + List partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + alterPartition(partVals, newPart, sharedCache); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void updateTableColStats(List colStatsForTable) { + try { + tableLock.readLock().lock(); + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + // Get old stats object if present + String key = colStatObj.getColName(); + ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + // TODO: get rid of deepCopy after making sure callers don't use references + tableColStatsCache.put(key, colStatObj.deepCopy()); + } + } + } finally { + tableLock.readLock().unlock(); + } + } + + public List getCachedTableColStats(List colNames) { + List colStatObjs = new ArrayList(); + try { + tableLock.readLock().lock(); + for (String colName : colNames) { + ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName); + if (colStatObj != null) { + colStatObjs.add(colStatObj); + } + } + } finally { + tableLock.readLock().unlock(); + } + return colStatObjs; + } + + public void removeTableColStats(String colName) { + try { + tableLock.writeLock().lock(); + tableColStatsCache.remove(colName); + } finally { + tableLock.writeLock().unlock(); + } + } + + public ColumnStatisticsObj getPartitionColStats(List partVal, String colName) { + try { + tableLock.readLock().lock(); + return partitionColStatsCache.get(CacheUtils.buildKey(partVal, colName)); + } finally { + tableLock.readLock().unlock(); + } + } + + public void cachePartitionColStats(List partitionColStats, + SharedCache sharedCache) { + // TODO Auto-generated method stub + + } + + public void updatePartitionColStats(List partVals, + List colStatsObjs) { + try { + tableLock.writeLock().lock(); + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + // Get old stats object if present + String key = CacheUtils.buildKey(partVals, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + // TODO: get rid of deepCopy after making sure callers don't use references + partitionColStatsCache.put(key, colStatObj.deepCopy()); + } + } + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void removePartitionColStats(List partVals, String colName) { + try { + tableLock.writeLock().lock(); + partitionColStatsCache.remove(CacheUtils.buildKey(partVals, colName)); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public List getAggrStats(List colNames, StatsType statsType) { + List colStats = new ArrayList(); + try { + tableLock.readLock().lock(); + for (String colName : colNames) { + List colStatList = aggrColStatsCache.get(colName); + // If unable to find stats for a column, return null so we can build stats + if (colStatList == null) { + return null; + } + ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); + // If unable to find stats for this StatsType, return null so we can build stats + if (colStatObj == null) { + return null; + } + colStats.add(colStatObj); + } + } finally { + tableLock.readLock().unlock(); + } + return colStats; + } + + public void cacheAggrStats(AggrStats aggrStatsAllPartitions, StatsType all) { + // TODO Auto-generated method stub + + } + + private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) { + Partition partCopy = part.deepCopy(); + PartitionWrapper wrapper; + if (part.getSd() != null) { + byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + sharedCache.increSd(sd, sdHash); + partCopy.setSd(null); + wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new PartitionWrapper(partCopy, null, null, null); + } + return wrapper; } - tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); } - public synchronized void removeTableFromCache(String dbName, String tblName) { - TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); - byte[] sdHash = tblWrapper.getSdHash(); - if (sdHash!=null) { - decrSd(sdHash); + static class PartitionWrapper { + Partition p; + String location; + Map parameters; + byte[] sdHash; + + PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { + this.p = p; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + + public Partition getPartition() { + return p; + } + + public byte[] getSdHash() { + return sdHash; + } + + public String getLocation() { + return location; + } + + public Map getParameters() { + return parameters; } } - public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { - return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null; + static class StorageDescriptorWrapper { + StorageDescriptor sd; + int refCount = 0; + + StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { + this.sd = sd; + this.refCount = refCount; + } + + public StorageDescriptor getSd() { + return sd; + } + + public int getRefCount() { + return refCount; + } } - public synchronized void removeTableColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - tableColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public Database getDatabaseFromCache(String name) { + Database db = null; + try { + cacheLock.readLock().lock(); + if (databaseCache.get(name) != null) { + db = databaseCache.get(name).deepCopy(); } + } finally { + cacheLock.readLock().unlock(); } + return db; } - public synchronized void removeTableColStatsFromCache(String dbName, String tblName, - String colName) { - if (colName == null) { - removeTableColStatsFromCache(dbName, tblName); - } else { - tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + public void addDatabaseToCache(Database db) { + try { + cacheLock.writeLock().lock(); + Database dbCopy = db.deepCopy(); + databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy); + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized void updateTableColStatsInCache(String dbName, String tableName, - List colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); - if (oldStatsObj != null) { - LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() - + ", of table: " + tableName + " and database: " + dbName); - // Update existing stat object's field - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - tableColStatsCache.put(key, colStatObj); - } + public void removeDatabaseFromCache(String dbName) { + try { + cacheLock.writeLock().lock(); + databaseCache.remove(dbName); + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { - removeTableFromCache(dbName, tblName); - addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), - StringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + public List listCachedDatabases() { + List results = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + results.addAll(databaseCache.keySet()); + } finally { + cacheLock.readLock().unlock(); + } + return results; } - public synchronized void alterTableInPartitionCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List partitions = listCachedPartitions(dbName, tblName, -1); - for (Partition part : partitions) { - removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); - part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName())); - part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName())); - addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), - StringUtils.normalizeIdentifier(newTable.getTableName()), part); + public List listCachedDatabases(String pattern) { + List results = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (String dbName : databaseCache.keySet()) { + dbName = StringUtils.normalizeIdentifier(dbName); + if (CacheUtils.matches(dbName, pattern)) { + results.add(dbName); + } } + } finally { + cacheLock.readLock().unlock(); } + return results; } - public synchronized void alterTableInTableColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - tableColStatsCache.entrySet().iterator(); - Map newTableColStats = - new HashMap<>(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { - String[] decomposedKey = CacheUtils.splitTableColStats(key); - String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); - newTableColStats.put(newKey, colStatObj); - iterator.remove(); - } + public void alterDatabaseInCache(String dbName, Database newDb) { + try { + cacheLock.writeLock().lock(); + removeDatabaseFromCache(dbName); + addDatabaseToCache(newDb.deepCopy()); + } finally { + cacheLock.writeLock().unlock(); + } + } + + public void refreshDatabasesInCache(List databases) { + try { + cacheLock.writeLock().lock(); + databaseCache.clear(); + for (Database db : databases) { + addDatabaseToCache(db.deepCopy()); } - tableColStatsCache.putAll(newTableColStats); + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List partitions = listCachedPartitions(dbName, tblName, -1); - Map newPartitionColStats = new HashMap<>(); - for (Partition part : partitions) { - String oldPartialPartitionKey = - CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - // New key has the new table name - String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), - (List) decomposedKey[2], (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); - } + public int getCachedDatabaseCount() { + try { + cacheLock.readLock().lock(); + return databaseCache.size(); + } finally { + cacheLock.readLock().unlock(); + } + } + + public void updateTableCache(Table table, ColumnStatistics tableColStats, + List partitions, List partitionColStats, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = addTableToCache(table.getDbName(), table.getTableName(), table); + if (tblWrapper != null) { + if ((tableColStats != null) && (tableColStats.getStatsObj() != null)) { + tblWrapper.updateTableColStats(tableColStats.getStatsObj()); } - } - partitionColStatsCache.putAll(newPartitionColStats); - } - } - - public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - Map> newAggrColStatsCache = - new HashMap>(); - String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator>> iterator = - aggrColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - String key = entry.getKey(); - List value = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitAggrColStats(key); - // New key has the new table name - String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), - (String) decomposedKey[2]); - newAggrColStatsCache.put(newKey, value); - iterator.remove(); + if (partitions != null) { + tblWrapper.cachePartitions(partitions, this); + } + if (partitionColStats != null) { + tblWrapper.cachePartitionColStats(partitionColStats, this); + } + if (aggrStatsAllPartitions != null) { + tblWrapper.cacheAggrStats(aggrStatsAllPartitions, StatsType.ALL); + } + if (aggrStatsAllButDefaultPartition != null) { + tblWrapper.cacheAggrStats(aggrStatsAllButDefaultPartition, StatsType.ALLBUTDEFAULT); } } - aggrColStatsCache.putAll(newAggrColStatsCache); + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized int getCachedTableCount() { - return tableCache.size(); + public void refreshTableCache(Table table, ColumnStatistics tableColStats, + List partitions, List partitionColStats, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + try { + cacheLock.writeLock().lock(); + String dbName = StringUtils.normalizeIdentifier(table.getDbName()); + String tblName = StringUtils.normalizeIdentifier(table.getTableName()); + TableWrapper tblWrapper = + tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash != null) { + decrSd(sdHash); + } + addTableToCache(dbName, tblName, table); + updateTableCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); + } finally { + cacheLock.writeLock().unlock(); + } } - public synchronized List
listCachedTables(String dbName) { - List
tables = new ArrayList<>(); - for (TableWrapper wrapper : tableCache.values()) { - if (wrapper.getTable().getDbName().equals(dbName)) { - tables.add(CacheUtils.assemble(wrapper, this)); + public Table getTableFromCache(String dbName, String tableName) { + Table t = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper != null) { + t = CacheUtils.assemble(tblWrapper, this); } + } finally { + cacheLock.readLock().unlock(); } - return tables; + return t; } - public synchronized List getTableMeta(String dbNames, String tableNames, List tableTypes) { - List tableMetas = new ArrayList<>(); - for (String dbName : listCachedDatabases()) { - if (CacheUtils.matches(dbName, dbNames)) { - for (Table table : listCachedTables(dbName)) { - if (CacheUtils.matches(table.getTableName(), tableNames)) { - if (tableTypes==null || tableTypes.contains(table.getTableType())) { - TableMeta metaData = new TableMeta( - dbName, table.getTableName(), table.getTableType()); - metaData.setComments(table.getParameters().get("comment")); - tableMetas.add(metaData); - } - } + public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) { + try { + cacheLock.writeLock().lock(); + Table tblCopy = tbl.deepCopy(); + tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName)); + tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName)); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(StringUtils.normalizeIdentifier(fs.getName())); } } + TableWrapper wrapper; + if (tbl.getSd() != null) { + byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md); + StorageDescriptor sd = tbl.getSd(); + increSd(sd, sdHash); + tblCopy.setSd(null); + wrapper = new TableWrapper(tblCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new TableWrapper(tblCopy, null, null, null); + } + wrapper = tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + return wrapper; + } finally { + cacheLock.writeLock().unlock(); } - return tableMetas; } - public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { - Partition partCopy = part.deepCopy(); - PartitionWrapper wrapper; - if (part.getSd()!=null) { - byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); - StorageDescriptor sd = part.getSd(); - increSd(sd, sdHash); - partCopy.setSd(null); - wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); - } else { - wrapper = new PartitionWrapper(partCopy, null, null, null); + public void removeTableFromCache(String dbName, String tblName) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash != null) { + decrSd(sdHash); + } + } finally { + cacheLock.writeLock().unlock(); } - partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); } - public synchronized Partition getPartitionFromCache(String key) { - PartitionWrapper wrapper = partitionCache.get(key); - if (wrapper == null) { - return null; + public void alterTableInCache(String dbName, String tblName, Table newTable) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + byte[] sdHash = tblWrapper.getSdHash(); + // Remove old table object's sd hash + if (sdHash != null) { + decrSd(sdHash); + } + Table tblCopy = newTable.deepCopy(); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(StringUtils.normalizeIdentifier(fs.getName())); + } + } + tblWrapper.setTable(tblCopy); + if (tblCopy.getSd() != null) { + sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md); + StorageDescriptor sd = tblCopy.getSd(); + increSd(sd, sdHash); + tblCopy.setSd(null); + tblWrapper.setSdHash(sdHash); + tblWrapper.setLocation(sd.getLocation()); + tblWrapper.setParameters(sd.getParameters()); + } else { + tblWrapper.setSdHash(null); + tblWrapper.setLocation(null); + tblWrapper.setParameters(null); + } + String newDbName = StringUtils.normalizeIdentifier(tblCopy.getDbName()); + String newTblName = StringUtils.normalizeIdentifier(tblCopy.getTableName()); + tableCache.put(CacheUtils.buildKey(newDbName, newTblName), tblWrapper); + } + } finally { + cacheLock.writeLock().unlock(); } - Partition p = CacheUtils.assemble(wrapper, this); - return p; } - public synchronized Partition getPartitionFromCache(String dbName, String tblName, List part_vals) { - return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); + public List
listCachedTables(String dbName) { + List
tables = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tables.add(CacheUtils.assemble(wrapper, this)); + } + } + } finally { + cacheLock.readLock().unlock(); + } + return tables; } - public synchronized boolean existPartitionFromCache(String dbName, String tblName, List part_vals) { - return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); + public List listCachedTableNames(String dbName) { + List tableNames = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName)) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); + } + } + } finally { + cacheLock.readLock().unlock(); + } + return tableNames; } - public synchronized Partition removePartitionFromCache(String dbName, String tblName, - List part_vals) { - PartitionWrapper wrapper = - partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash() != null) { - decrSd(wrapper.getSdHash()); + public List listCachedTableNames(String dbName, String pattern, short maxTables) { + List tableNames = new ArrayList(); + try { + cacheLock.readLock().lock(); + int count = 0; + for (TableWrapper wrapper : tableCache.values()) { + if ((wrapper.getTable().getDbName().equals(dbName)) + && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) + && (maxTables == -1 || count < maxTables)) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); + count++; + } + } + } finally { + cacheLock.readLock().unlock(); } - return wrapper.getPartition(); + return tableNames; } - /** - * Given a db + table, remove all partitions for this table from the cache - * @param dbName - * @param tblName - * @return - */ - public synchronized void removePartitionsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = partitionCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - PartitionWrapper wrapper = entry.getValue(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); - if (wrapper.getSdHash() != null) { - decrSd(wrapper.getSdHash()); + public List listCachedTableNames(String dbName, String pattern, TableType tableType) { + List tableNames = new ArrayList(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if ((wrapper.getTable().getDbName().equals(dbName)) + && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) + && wrapper.getTable().getTableType().equals(tableType.toString())) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); } } + } finally { + cacheLock.readLock().unlock(); } + return tableNames; } - // Remove cached column stats for all partitions of all tables in a db - public synchronized void removePartitionColStatsFromCache(String dbName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public List getTableColStatsFromCache(String dbName, String tblName, + List colNames) { + List colStatObjs = new ArrayList(); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + colStatObjs = tblWrapper.getCachedTableColStats(colNames); } + } finally { + cacheLock.readLock().unlock(); } + return colStatObjs; } - // Remove cached column stats for all partitions of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void removeTableColStatsFromCache(String dbName, String tblName, String colName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.removeTableColStats(colName); } + } finally { + cacheLock.readLock().unlock(); } } - // Remove cached column stats for a particular partition of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List partVals) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void updateTableColStatsInCache(String dbName, String tableName, + List colStatsForTable) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper != null) { + tblWrapper.updateTableColStats(colStatsForTable); } + } finally { + cacheLock.readLock().unlock(); } } - // Remove cached column stats for a particular partition and a particular column of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List partVals, String colName) { - partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + public int getCachedTableCount() { + try { + cacheLock.readLock().lock(); + return tableCache.size(); + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized List listCachedPartitions(String dbName, String tblName, int max) { - List partitions = new ArrayList<>(); - int count = 0; - for (PartitionWrapper wrapper : partitionCache.values()) { - if (wrapper.getPartition().getDbName().equals(dbName) - && wrapper.getPartition().getTableName().equals(tblName) - && (max == -1 || count < max)) { - partitions.add(CacheUtils.assemble(wrapper, this)); - count++; + public List getTableMeta(String dbNames, String tableNames, + List tableTypes) { + List tableMetas = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes == null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = + new TableMeta(dbName, table.getTableName(), table.getTableType()); + metaData.setComments(table.getParameters().get("comment")); + tableMetas.add(metaData); + } + } + } + } } + } finally { + cacheLock.readLock().unlock(); } - return partitions; + return tableMetas; } - public synchronized void alterPartitionInCache(String dbName, String tblName, - List partVals, Partition newPart) { - removePartitionFromCache(dbName, tblName, partVals); - addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()), - StringUtils.normalizeIdentifier(newPart.getTableName()), newPart); + public void addPartitionToCache(String dbName, String tblName, Partition part) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.cachePartition(part, this); + } + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized void alterPartitionInColStatsCache(String dbName, String tblName, - List partVals, Partition newPart) { - String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Map newPartitionColStats = new HashMap<>(); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - String newKey = - CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()), - StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), - (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); + public void addPartitionsToCache(String dbName, String tblName, List parts) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.cachePartitions(parts, this); } + } finally { + cacheLock.readLock().unlock(); } - partitionColStatsCache.putAll(newPartitionColStats); } - public synchronized void updatePartitionColStatsInCache(String dbName, String tableName, - List partVals, List colStatsObjs) { - for (ColumnStatisticsObj colStatObj : colStatsObjs) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); - if (oldStatsObj != null) { - // Update existing stat object's field - LOG.debug("CachedStore: updating partition column stats for column: " - + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - partitionColStatsCache.put(key, colStatObj); + public Partition getPartitionFromCache(String dbName, String tblName, + List partVals) { + Partition part = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + part = tblWrapper.getPartition(partVals, this); } + } finally { + cacheLock.readLock().unlock(); } + return part; } - public synchronized int getCachedPartitionCount() { - return partitionCache.size(); + public boolean existPartitionFromCache(String dbName, String tblName, List partVals) { + boolean existsPart = false; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + existsPart = tblWrapper.containsPartition(partVals); + } + } finally { + cacheLock.readLock().unlock(); + } + return existsPart; } - public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { - return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; + public Partition removePartitionFromCache(String dbName, String tblName, + List partVals) { + Partition part = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + part = tblWrapper.removePartition(partVals, this); + } + } finally { + cacheLock.readLock().unlock(); + } + return part; } - public synchronized void addPartitionColStatsToCache( - List colStatsForDB) { - for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) { - List partVals; - try { - partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName()); - ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj(); - String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(), - colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName()); - partitionColStatsCache.put(key, colStatObj); - } catch (MetaException e) { - LOG.info("Unable to add partition stats for: {} to SharedCache", - colStatWithSourceInfo.getPartName(), e); + public void removePartitionsFromCache(String dbName, String tblName, + List> partVals) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.removePartitions(partVals, this); } + } finally { + cacheLock.readLock().unlock(); } - } - public synchronized void refreshPartitionColStats(String dbName, - List colStatsForDB) { - LOG.debug("CachedStore: updating cached partition column stats objects for database: {}", - dbName); - removePartitionColStatsFromCache(dbName); - addPartitionColStatsToCache(colStatsForDB); + public List listCachedPartitions(String dbName, String tblName, int max) { + List parts = new ArrayList(); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + parts = tblWrapper.listPartitions(max, this); + } + } finally { + cacheLock.readLock().unlock(); + } + return parts; } - public synchronized void addAggregateStatsToCache(String dbName, String tblName, - AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { - if (aggrStatsAllPartitions != null) { - for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) { - String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); - List value = new ArrayList(); - value.add(StatsType.ALL.getPosition(), colStatObj); - aggrColStatsCache.put(key, value); - } - } - if (aggrStatsAllButDefaultPartition != null) { - for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) { - String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); - List value = aggrColStatsCache.get(key); - if ((value != null) && (value.size() > 0)) { - value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj); - } + public void alterPartitionInCache(String dbName, String tblName, List partVals, + Partition newPart) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.alterPartition(partVals, newPart, this); } + } finally { + cacheLock.readLock().unlock(); } } - public List getAggrStatsFromCache(String dbName, String tblName, - List colNames, StatsType statsType) { - List colStats = new ArrayList(); - for (String colName : colNames) { - String key = CacheUtils.buildKey(dbName, tblName, colName); - List colStatList = aggrColStatsCache.get(key); - // If unable to find stats for a column, return null so we can build stats - if (colStatList == null) { - return null; - } - ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); - // If unable to find stats for this StatsType, return null so we can build - // stats - if (colStatObj == null) { - return null; + public void alterPartitionsInCache(String dbName, String tblName, List> partValsList, + List newParts) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.alterPartitions(partValsList, newParts, this); } - colStats.add(colStatObj); + } finally { + cacheLock.readLock().unlock(); } - return colStats; } - public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator>> iterator = - aggrColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals, String colName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + tblWrapper.removePartitionColStats(partVals, colName); } + } finally { + cacheLock.readLock().unlock(); } } - public synchronized void refreshAggregateStatsCache(String dbName, String tblName, - AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { - LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName, - tblName); - removeAggrPartitionColStatsFromCache(dbName, tblName); - addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); + public void updatePartitionColStatsInCache(String dbName, String tableName, + List partVals, List colStatsObjs) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper != null) { + tblWrapper.updatePartitionColStats(partVals, colStatsObjs); + } + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized void addTableColStatsToCache(String dbName, String tableName, - List colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - tableColStatsCache.put(key, colStatObj); +// public int getCachedPartitionCount() { +// int partitionCount = 0; +// try { +// cacheLock.readLock().lock(); +// for () { +// +// } +// } finally { +// cacheLock.readLock().unlock(); +// } +// return partitionCount; +// } + + public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName, + List partVal, String colName) { + ColumnStatisticsObj colStatObj = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + colStatObj = tblWrapper.getPartitionColStats(partVal, colName); + } + } finally { + cacheLock.readLock().unlock(); } + return colStatObj; } - public synchronized void refreshTableColStats(String dbName, String tableName, - List colStatsForTable) { - LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName - + " and table: " + tableName); - // Remove all old cache entries for this table - removeTableColStatsFromCache(dbName, tableName); - // Add new entries to cache - addTableColStatsToCache(dbName, tableName, colStatsForTable); + public List getAggrStatsFromCache(String dbName, String tblName, + List colNames, StatsType statsType) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if (tblWrapper != null) { + return tblWrapper.getAggrStats(colNames, statsType); + } + } finally { + cacheLock.readLock().unlock(); + } + return null; } public void increSd(StorageDescriptor sd, byte[] sdHash) { @@ -640,45 +1053,6 @@ public StorageDescriptor getSdFromCache(byte[] sdHash) { return sdWrapper.getSd(); } - // Replace databases in databaseCache with the new list - public synchronized void refreshDatabases(List databases) { - LOG.debug("CachedStore: updating cached database objects"); - for (String dbName : listCachedDatabases()) { - removeDatabaseFromCache(dbName); - } - for (Database db : databases) { - addDatabaseToCache(db.getName(), db); - } - } - - // Replace tables in tableCache with the new list - public synchronized void refreshTables(String dbName, List
tables) { - LOG.debug("CachedStore: updating cached table objects for database: " + dbName); - for (Table tbl : listCachedTables(dbName)) { - removeTableFromCache(dbName, tbl.getTableName()); - } - for (Table tbl : tables) { - addTableToCache(dbName, tbl.getTableName(), tbl); - } - } - - public synchronized void refreshPartitions(String dbName, String tblName, - List partitions) { - LOG.debug("CachedStore: updating cached partition objects for database: " + dbName - + " and table: " + tblName); - Iterator> iterator = partitionCache.entrySet().iterator(); - while (iterator.hasNext()) { - PartitionWrapper partitionWrapper = iterator.next().getValue(); - if (partitionWrapper.getPartition().getDbName().equals(dbName) - && partitionWrapper.getPartition().getTableName().equals(tblName)) { - iterator.remove(); - } - } - for (Partition part : partitions) { - addPartitionToCache(dbName, tblName, part); - } - } - @VisibleForTesting Map getDatabaseCache() { return databaseCache; @@ -690,17 +1064,7 @@ public synchronized void refreshPartitions(String dbName, String tblName, } @VisibleForTesting - Map getPartitionCache() { - return partitionCache; - } - - @VisibleForTesting Map getSdCache() { return sdCache; } - - @VisibleForTesting - Map getPartitionColStatsCache() { - return partitionColStatsCache; - } }