diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java index 78b26374f2..6ec682723c 100644 --- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java +++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java @@ -998,7 +998,7 @@ public WMFullResourcePlan getResourcePlan(String name) throws NoSuchObjectExcept public List getAllResourcePlans() throws MetaException { return objectStore.getAllResourcePlans(); } - + @Override public WMFullResourcePlan alterResourcePlan(String name, WMNullableResourcePlan resourcePlan, boolean canActivateDisabled, boolean canDeactivate, boolean isReplace) @@ -1090,11 +1090,4 @@ public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerNa String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath); } - - @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; - } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java index d58ed677f3..b76e70a2f4 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java @@ -8011,35 +8011,6 @@ protected String describeResult() { } @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - final boolean enableBitVector = - MetastoreConf.getBoolVar(getConf(), ConfVars.STATS_FETCH_BITVECTOR); - return new GetHelper>(dbName, null, true, false) { - @Override - protected List getSqlResult( - GetHelper> ctx) throws MetaException { - return directSql.getColStatsForAllTablePartitions(dbName, enableBitVector); - } - - @Override - protected List getJdoResult( - GetHelper> ctx) - throws MetaException, NoSuchObjectException { - // This is fast path for query optimizations, if we can find this info - // quickly using directSql, do it. No point in failing back to slow path - // here. - throw new MetaException("Jdo path is not implemented for getPartitionColStatsForDatabase."); - } - - @Override - protected String describeResult() { - return null; - } - }.run(true); - } - - @Override public void flushCache() { // NOP as there's no caching } @@ -9967,7 +9938,7 @@ public WMFullResourcePlan alterResourcePlan(String name, WMNullableResourcePlan } else { result = handleSimpleAlter(name, changes, canActivateDisabled, canDeactivate); } - + commited = commitTransaction(); return result; } catch (Exception e) { diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java index e4e7d4239d..0486f656bf 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java @@ -602,17 +602,6 @@ AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException; /** - * Get column stats for all partitions of all tables in the database - * - * @param dbName - * @return List of column stats objects for all partitions of all tables in the database - * @throws MetaException - * @throws NoSuchObjectException - */ - List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException; - - /** * Get the next notification event. * @param rqst Request containing information on the last processed notification. * @return list of notifications, sorted by eventId diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java index f0f650ddcf..8c9fb285e5 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CacheUtils.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; +import org.apache.hadoop.hive.metastore.cache.SharedCache.PartitionWrapper; +import org.apache.hadoop.hive.metastore.cache.SharedCache.TableWrapper; import org.apache.hadoop.hive.metastore.utils.StringUtils; public class CacheUtils { @@ -59,6 +59,19 @@ public static String buildKey(String dbName, String tableName, List part return key; } + public static String buildKey(List partVals) { + String key = ""; + if (CollectionUtils.isNotEmpty(partVals)) { + key += String.join(delimit, partVals); + } + return key; + } + + public static String buildKey(List partVals, String colName) { + String key = buildKey(partVals); + return key + delimit + colName; + } + public static String buildKeyWithDelimit(String dbName, String tableName, List partVals) { return buildKey(dbName, tableName, partVals) + delimit; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index 80aa3bcdb4..2bc70f6662 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -31,8 +31,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -90,7 +88,6 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint; import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey; import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; import org.apache.hadoop.hive.metastore.api.Type; @@ -118,28 +115,12 @@ // TODO constraintCache // TODO need sd nested copy? // TODO String intern -// TODO restructure HBaseStore // TODO monitor event queue // TODO initial load slow? // TODO size estimation -// TODO factor in extrapolation logic (using partitions found) during aggregate stats calculation public class CachedStore implements RawStore, Configurable { private static ScheduledExecutorService cacheUpdateMaster = null; - private static ReentrantReadWriteLock databaseCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock tableColStatsCacheLock = new ReentrantReadWriteLock(true); - private static AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); - private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( - true); - private static ReentrantReadWriteLock partitionAggrColStatsCacheLock = - new ReentrantReadWriteLock(true); - private static AtomicBoolean isPartitionAggrColStatsCacheDirty = new AtomicBoolean(false); - private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); private static List whitelistPatterns = null; private static List blacklistPatterns = null; private RawStore rawStore = null; @@ -154,71 +135,6 @@ static final private Logger LOG = LoggerFactory.getLogger(CachedStore.class.getName()); - static class TableWrapper { - Table t; - String location; - Map parameters; - byte[] sdHash; - TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { - this.t = t; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Table getTable() { - return t; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map getParameters() { - return parameters; - } - } - - static class PartitionWrapper { - Partition p; - String location; - Map parameters; - byte[] sdHash; - PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { - this.p = p; - this.sdHash = sdHash; - this.location = location; - this.parameters = parameters; - } - public Partition getPartition() { - return p; - } - public byte[] getSdHash() { - return sdHash; - } - public String getLocation() { - return location; - } - public Map getParameters() { - return parameters; - } - } - - static class StorageDescriptorWrapper { - StorageDescriptor sd; - int refCount = 0; - StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { - this.sd = sd; - this.refCount = refCount; - } - public StorageDescriptor getSd() { - return sd; - } - public int getRefCount() { - return refCount; - } - } - public CachedStore() { } @@ -270,17 +186,9 @@ static void prewarm(RawStore rawStore) throws Exception { SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); for (int i = 0; i < dbNames.size(); i++) { String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); - // Cache partition column stats - Deadline.startTimer("getColStatsForDatabase"); - List colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - sharedCache.addPartitionColStatsToCache(colStatsForDB); - } LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); Database db = rawStore.getDatabase(dbName); - sharedCache.addDatabaseToCache(dbName, db); + sharedCache.addDatabaseToCache(db); List tblNames = rawStore.getAllTables(dbName); LOG.debug("Tables in database: {} : {}", dbName, tblNames); for (int j = 0; j < tblNames.size(); j++) { @@ -293,51 +201,54 @@ static void prewarm(RawStore rawStore) throws Exception { tblName, j, tblNames.size()); Table table = null; table = rawStore.getTable(dbName, tblName); + ColumnStatistics tableColStats = null; + List partitions = null; + List partitionColStats = null; + AggrStats aggrStatsAllPartitions = null; + AggrStats aggrStatsAllButDefaultPartition = null; // It is possible the table is deleted during fetching tables of the database, // in that case, continue with the next table if (table == null) { continue; } - sharedCache.addTableToCache(dbName, tblName, table); - if (table.getPartitionKeys() != null && table.getPartitionKeys().size() > 0) { - Deadline.startTimer("getPartitions"); - List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); - Deadline.stopTimer(); - for (Partition partition : partitions) { - sharedCache.addPartitionToCache(dbName, tblName, partition); - } - } - // Cache table column stats + // Get table column stats List colNames = MetaStoreUtils.getColumnNamesForTable(table); Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); + tableColStats = rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); - if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); - } - // Cache aggregate stats for all partitions of a table and for all but default partition - List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - if ((partNames != null) && (partNames.size() > 0)) { - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - // Remove default partition from partition names and get aggregate - // stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); + if (table.isSetPartitionKeys()) { + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + Deadline.startTimer("getPartitions"); + partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); + Deadline.stopTimer(); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + partitionColStats = + rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + if ((partNames != null) && (partNames.size() > 0)) { + // Get aggregate stats for all partitions of a table and for all but default partition + aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + // Remove default partition from partition names and get aggregate + // stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - sharedCache.addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); } + sharedCache.populateTableInCache(table, tableColStats, partitions, partitionColStats, + aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); } } // Notify all blocked threads that prewarm is complete now @@ -453,20 +364,21 @@ public void update() { // Update the database in cache updateDatabases(rawStore, dbNames); for (String dbName : dbNames) { - updateDatabasePartitionColStats(rawStore, dbName); // Update the tables in cache updateTables(rawStore, dbName); - List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); + List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { if (!shouldCacheTable(dbName, tblName)) { continue; } - // Update the partitions for a table in cache - updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache updateTableColStats(rawStore, dbName, tblName); - // Update aggregate column stats cache - updateAggregateStatsCache(rawStore, dbName, tblName); + // Update the partitions for a table in cache + updateTablePartitions(rawStore, dbName, tblName); + // Update the partition col stats for a table in cache + updateTablePartitionColStats(rawStore, dbName, tblName); + // Update aggregate partition column stats for a table in cache + updateTableAggregatePartitionColStats(rawStore, dbName, tblName); } } } @@ -475,89 +387,7 @@ public void update() { } } - private void updateDatabasePartitionColStats(RawStore rawStore, String dbName) { - try { - Deadline.startTimer("getColStatsForDatabasePartitions"); - List colStatsForDB = - rawStore.getPartitionColStatsForDatabase(dbName); - Deadline.stopTimer(); - if (colStatsForDB != null) { - if (partitionColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition column stats cache update; the partition column stats " - + "list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe() - .refreshPartitionColStats(StringUtils.normalizeIdentifier(dbName), colStatsForDB); - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read partitions column stats of database: {}", - dbName, e); - } finally { - if (partitionColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionColStatsCacheLock.writeLock().unlock(); - } - } - } - - // Update cached aggregate stats for all partitions of a table and for all - // but default partition - private void updateAggregateStatsCache(RawStore rawStore, String dbName, String tblName) { - try { - Table table = rawStore.getTable(dbName, tblName); - List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); - List colNames = MetaStoreUtils.getColumnNamesForTable(table); - if ((partNames != null) && (partNames.size() > 0)) { - Deadline.startTimer("getAggregareStatsForAllPartitions"); - AggrStats aggrStatsAllPartitions = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - // Remove default partition from partition names and get aggregate stats again - List partKeys = table.getPartitionKeys(); - String defaultPartitionValue = - MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); - List partCols = new ArrayList(); - List partVals = new ArrayList(); - for (FieldSchema fs : partKeys) { - partCols.add(fs.getName()); - partVals.add(defaultPartitionValue); - } - String defaultPartitionName = FileUtils.makePartName(partCols, partVals); - partNames.remove(defaultPartitionName); - Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); - AggrStats aggrStatsAllButDefaultPartition = - rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); - Deadline.stopTimer(); - if ((aggrStatsAllPartitions != null) && (aggrStatsAllButDefaultPartition != null)) { - if (partitionAggrColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionAggrColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug( - "Skipping aggregate column stats cache update; the aggregate column stats we " - + "have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshAggregateStatsCache( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - aggrStatsAllPartitions, aggrStatsAllButDefaultPartition); - } - } - } - } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, - e); - } finally { - if (partitionAggrColStatsCacheLock.isWriteLockedByCurrentThread()) { - partitionAggrColStatsCacheLock.writeLock().unlock(); - } - } - } - private void updateDatabases(RawStore rawStore, List dbNames) { - // Prepare the list of databases List databases = new ArrayList<>(); for (String dbName : dbNames) { Database db; @@ -568,24 +398,13 @@ private void updateDatabases(RawStore rawStore, List dbNames) { LOG.info("Updating CachedStore: database - " + dbName + " does not exist.", e); } } - // Update the cached database objects - try { - if (databaseCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isDatabaseCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping database cache update; the database list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshDatabases(databases); - } - } finally { - if (databaseCacheLock.isWriteLockedByCurrentThread()) { - databaseCacheLock.writeLock().unlock(); - } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.refreshDatabasesInCache(databases); } - // Update the cached table objects private void updateTables(RawStore rawStore, String dbName) { List tables = new ArrayList<>(); try { @@ -594,81 +413,119 @@ private void updateTables(RawStore rawStore, String dbName) { if (!shouldCacheTable(dbName, tblName)) { continue; } - Table table = - rawStore.getTable(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName)); tables.add(table); } - if (tableCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table cache update; the table list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTables(dbName, tables); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.refreshTablesInCache(dbName, tables); } catch (MetaException e) { - LOG.info("Updating CachedStore: unable to read tables for database - " + dbName, e); - } finally { - if (tableCacheLock.isWriteLockedByCurrentThread()) { - tableCacheLock.writeLock().unlock(); + LOG.debug("Unable to refresh cached tables for database: " + dbName, e); + } + } + + private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + if (!table.isSetPartitionKeys()) { + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + Deadline.startTimer("getTableColumnStatistics"); + ColumnStatistics tableColStats = + rawStore.getTableColumnStatistics(dbName, tblName, colNames); + Deadline.stopTimer(); + if (tableColStats != null) { + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; + } + sharedCache.refreshTableColStatsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + } } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Unable to refresh table column stats for table: " + tblName, e); } } - // Update the cached partition objects for a table private void updateTablePartitions(RawStore rawStore, String dbName, String tblName) { try { Deadline.startTimer("getPartitions"); List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); - if (partitionCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isPartitionCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping partition cache update; the partition list we have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshPartitions( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partitions); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.refreshPartitionsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), partitions); } catch (MetaException | NoSuchObjectException e) { LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); - } finally { - if (partitionCacheLock.isWriteLockedByCurrentThread()) { - partitionCacheLock.writeLock().unlock(); - } } } - // Update the cached col stats for this table - private void updateTableColStats(RawStore rawStore, String dbName, String tblName) { + private void updateTablePartitionColStats(RawStore rawStore, String dbName, String tblName) { try { Table table = rawStore.getTable(dbName, tblName); List colNames = MetaStoreUtils.getColumnNamesForTable(table); - Deadline.startTimer("getTableColumnStatistics"); - ColumnStatistics tableColStats = - rawStore.getTableColumnStatistics(dbName, tblName, colNames); + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + // Get partition column stats for this table + Deadline.startTimer("getPartitionColumnStatistics"); + List partitionColStats = + rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); Deadline.stopTimer(); - if (tableColStats != null) { - if (tableColStatsCacheLock.writeLock().tryLock()) { - // Skip background updates if we detect change - if (isTableColStatsCacheDirty.compareAndSet(true, false)) { - LOG.debug("Skipping table column stats cache update; the table column stats list we " - + "have is dirty."); - return; - } - sharedCacheWrapper.getUnsafe().refreshTableColStats( - StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); - } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.refreshPartitionColStatsInCache(dbName, tblName, partitionColStats); } catch (MetaException | NoSuchObjectException e) { - LOG.info("Updating CachedStore: unable to read table column stats of table: " + tblName, e); - } finally { - if (tableColStatsCacheLock.isWriteLockedByCurrentThread()) { - tableColStatsCacheLock.writeLock().unlock(); + LOG.info("Updating CachedStore: unable to read partitions of table: " + tblName, e); + } + } + + // Update cached aggregate stats for all partitions of a table and for all + // but default partition + private void updateTableAggregatePartitionColStats(RawStore rawStore, String dbName, + String tblName) { + try { + Table table = rawStore.getTable(dbName, tblName); + List partNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); + List colNames = MetaStoreUtils.getColumnNamesForTable(table); + if ((partNames != null) && (partNames.size() > 0)) { + Deadline.startTimer("getAggregareStatsForAllPartitions"); + AggrStats aggrStatsAllPartitions = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + // Remove default partition from partition names and get aggregate stats again + List partKeys = table.getPartitionKeys(); + String defaultPartitionValue = + MetastoreConf.getVar(rawStore.getConf(), ConfVars.DEFAULTPARTITIONNAME); + List partCols = new ArrayList(); + List partVals = new ArrayList(); + for (FieldSchema fs : partKeys) { + partCols.add(fs.getName()); + partVals.add(defaultPartitionValue); + } + String defaultPartitionName = FileUtils.makePartName(partCols, partVals); + partNames.remove(defaultPartitionName); + Deadline.startTimer("getAggregareStatsForAllPartitionsExceptDefault"); + AggrStats aggrStatsAllButDefaultPartition = + rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + Deadline.stopTimer(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; + } + sharedCache.refreshAggregateStatsInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } + } catch (MetaException | NoSuchObjectException e) { + LOG.info("Updating CachedStore: unable to read aggregate column stats of table: " + tblName, + e); } } } @@ -707,26 +564,18 @@ public void rollbackTransaction() { public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(db.getName()), - db.deepCopy()); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return; } + sharedCache.addDatabaseToCache(db); } @Override public Database getDatabase(String dbName) throws NoSuchObjectException { SharedCache sharedCache; - if (!sharedCacheWrapper.isInitialized()) { return rawStore.getDatabase(dbName); } - try { sharedCache = sharedCacheWrapper.get(); } catch (MetaException e) { @@ -744,34 +593,24 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc boolean succ = rawStore.dropDatabase(dbname); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname)); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbname)); } return succ; } @Override - public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectException, - MetaException { + public boolean alterDatabase(String dbName, Database db) + throws NoSuchObjectException, MetaException { boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - databaseCacheLock.readLock().lock(); - isDatabaseCacheDirty.set(true); - sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); - } finally { - databaseCacheLock.readLock().unlock(); + if (sharedCache == null) { + return succ; } + sharedCache.alterDatabaseInCache(StringUtils.normalizeIdentifier(dbName), db); } return succ; } @@ -782,14 +621,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce return rawStore.getDatabases(pattern); } SharedCache sharedCache = sharedCacheWrapper.get(); - List results = new ArrayList<>(); - for (String dbName : sharedCache.listCachedDatabases()) { - dbName = StringUtils.normalizeIdentifier(dbName); - if (CacheUtils.matches(dbName, pattern)) { - results.add(dbName); - } - } - return results; + return sharedCache.listCachedDatabases(pattern); } @Override @@ -842,22 +674,17 @@ public void createTable(Table tbl) throws InvalidObjectException, MetaException if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - validateTableType(tbl); - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.addTableToCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + validateTableType(tbl); + sharedCache.addTableToCache(dbName, tblName, tbl); } @Override - public boolean dropTable(String dbName, String tblName) throws MetaException, - NoSuchObjectException, InvalidObjectException, InvalidInputException { + public boolean dropTable(String dbName, String tblName) + throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { dbName = StringUtils.normalizeIdentifier(dbName); @@ -865,26 +692,11 @@ public boolean dropTable(String dbName, String tblName) throws MetaException, if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - // Remove table - try { - // Wait if background table cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove table col stats - try { - // Wait if background table col stats cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName); - } finally { - tableColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } + sharedCache.removeTableFromCache(dbName, tblName); } return succ; } @@ -893,11 +705,15 @@ public boolean dropTable(String dbName, String tblName) throws MetaException, public Table getTable(String dbName, String tblName) throws MetaException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getTable(dbName, tblName); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // This table is not yet loaded in cache + return rawStore.getTable(dbName, tblName); + } if (tbl != null) { tbl.unsetPrivileges(); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); @@ -914,25 +730,11 @@ public boolean addPartition(Partition part) throws InvalidObjectException, MetaE if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(dbName, tblName, part); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } + sharedCache.addPartitionToCache(dbName, tblName, part); } return succ; } @@ -947,27 +749,11 @@ public boolean addPartitions(String dbName, String tblName, List part if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (Partition part : parts) { - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } + sharedCache.addPartitionsToCache(dbName, tblName, parts); } return succ; } @@ -982,28 +768,14 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); - while (iterator.hasNext()) { - Partition part = iterator.next(); - sharedCache.addPartitionToCache(dbName, tblName, part); - } - } finally { - partitionCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); + while (iterator.hasNext()) { + Partition part = iterator.next(); + sharedCache.addPartitionToCache(dbName, tblName, part); } } return succ; @@ -1014,16 +786,14 @@ public Partition getPartition(String dbName, String tblName, List part_v throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getPartition(dbName, tblName, part_vals); } - SharedCache sharedCache = sharedCacheWrapper.get(); - Partition part = - sharedCache.getPartitionFromCache(dbName, tblName, part_vals); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part == null) { - // TODO Manage privileges - throw new NoSuchObjectException("partition values=" + part_vals.toString()); + // The table containing the partition is not yet loaded in cache + return rawStore.getPartition(dbName, tblName, part_vals); } return part; } @@ -1033,10 +803,15 @@ public boolean doesPartitionExist(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.doesPartitionExist(dbName, tblName, part_vals); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table containing the partition is not yet loaded in cache return rawStore.doesPartitionExist(dbName, tblName, part_vals); } - SharedCache sharedCache = sharedCacheWrapper.get(); return sharedCache.existPartitionFromCache(dbName, tblName, part_vals); } @@ -1050,48 +825,49 @@ public boolean dropPartition(String dbName, String tblName, List part_va if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - // Remove partition - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(dbName, tblName, part_vals); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } return succ; } @Override + public void dropPartitions(String dbName, String tblName, List partNames) + throws MetaException, NoSuchObjectException { + rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + if (!shouldCacheTable(dbName, tblName)) { + return; + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; + } + List> partVals = new ArrayList>(); + for (String partName : partNames) { + partVals.add(partNameToVals(partName)); + } + sharedCache.removePartitionsFromCache(dbName, tblName, partVals); + } + + @Override public List getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitions(dbName, tblName, max); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table containing the partitions is not yet loaded in cache return rawStore.getPartitions(dbName, tblName, max); } - SharedCache sharedCache = sharedCacheWrapper.get(); List parts = sharedCache.listCachedPartitions(dbName, tblName, max); return parts; } @@ -1106,71 +882,24 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - - if (shouldCacheTable(dbName, newTblName)) { - validateTableType(newTable); - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); - } - } else { - // Remove the table and its cached partitions, stats etc, - // since it does not pass the whitelist/blacklist filter. - // Remove table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(dbName, tblName); - } finally { - tableCacheLock.readLock().unlock(); - } - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.removePartitionsFromCache(dbName, tblName); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(dbName, tblName); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Update aggregate partition col stats keys wherever applicable - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.alterTableInAggrPartitionColStatsCache(dbName, tblName, newTable); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; + } + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache + return; + } + if (shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache and the new table can also be cached + sharedCache.alterTableInCache(dbName, tblName, newTable); + } else if (!shouldCacheTable(dbName, tblName) && shouldCacheTable(dbName, newTblName)) { + // If old table is *not* in the cache but the new table can be cached + sharedCache.addTableToCache(dbName, newTblName, newTable); + } else if (shouldCacheTable(dbName, tblName) && !shouldCacheTable(dbName, newTblName)) { + // If old table is in the cache but the new table *cannot* be cached + sharedCache.removeTableFromCache(dbName, tblName); } } @@ -1179,14 +908,9 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTables(dbName, pattern); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern)) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + (short) -1); } @Override @@ -1195,15 +919,9 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTables(dbName, pattern, tableType); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern) && - table.getTableType().equals(tableType.toString())) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), pattern, + tableType); } @Override @@ -1219,7 +937,7 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTableMeta(dbNames, tableNames, tableTypes); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), StringUtils.normalizeIdentifier(tableNames), tableTypes); } @@ -1239,7 +957,7 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!sharedCacheWrapper.isInitialized() || missSomeInCache) { return rawStore.getTableObjectsByName(dbName, tblNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); List
tables = new ArrayList<>(); for (String tblName : tblNames) { tblName = StringUtils.normalizeIdentifier(tblName); @@ -1257,16 +975,8 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.getAllTables(dbName); } - SharedCache sharedCache = sharedCacheWrapper.get(); - return getAllTablesInternal(dbName, sharedCache); - } - - private static List getAllTablesInternal(String dbName, SharedCache sharedCache) { - List tblNames = new ArrayList<>(); - for (Table tbl : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - tblNames.add(StringUtils.normalizeIdentifier(tbl.getTableName())); - } - return tblNames; + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName)); } @Override @@ -1275,17 +985,9 @@ public void alterTable(String dbName, String tblName, Table newTable) if (!isBlacklistWhitelistEmpty(conf) || !sharedCacheWrapper.isInitialized()) { return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } - SharedCache sharedCache = sharedCacheWrapper.get(); - List tableNames = new ArrayList<>(); - int count = 0; - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; - } - } - return tableNames; + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + return sharedCache.listCachedTableNames(StringUtils.normalizeIdentifier(dbName), filter, + max_tables); } @Override @@ -1293,16 +995,20 @@ public void alterTable(String dbName, String tblName, Table newTable) short max_parts) throws MetaException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionNames(dbName, tblName, max_parts); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + // The table is not yet loaded in cache return rawStore.listPartitionNames(dbName, tblName, max_parts); } - SharedCache sharedCache = sharedCacheWrapper.get(); List partitionNames = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) { if (max_parts == -1 || count < max_parts) { - partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); + partitionNames.add(Warehouse.makePartName(tbl.getPartitionKeys(), part.getValues())); } } return partitionNames; @@ -1331,35 +1037,11 @@ public void alterPartition(String dbName, String tblName, List partVals, if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } @Override @@ -1371,43 +1053,11 @@ public void alterPartitions(String dbName, String tblName, List> pa if (!shouldCacheTable(dbName, tblName)) { return; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update partition cache - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Update partition column stats cache - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (int i = 0; i < partValsList.size(); i++) { - List partVals = partValsList.get(i); - Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return; } + sharedCache.alterPartitionsInCache(dbName, tblName, partValsList, newParts); } @Override @@ -1448,18 +1098,18 @@ public void alterIndex(String dbname, String baseTblName, String name, private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, String defaultPartName, short maxParts, List result, SharedCache sharedCache) - throws MetaException, NoSuchObjectException { - List parts = sharedCache.listCachedPartitions( - StringUtils.normalizeIdentifier(table.getDbName()), - StringUtils.normalizeIdentifier(table.getTableName()), maxParts); + throws MetaException, NoSuchObjectException { + List parts = + sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(table.getDbName()), + StringUtils.normalizeIdentifier(table.getTableName()), maxParts); for (Partition part : parts) { result.add(Warehouse.makePartName(table.getPartitionKeys(), part.getValues())); } if (defaultPartName == null || defaultPartName.isEmpty()) { defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); } - return expressionProxy.filterPartitionsByExpr( - table.getPartitionKeys(), expr, defaultPartName, result); + return expressionProxy.filterPartitionsByExpr(table.getPartitionKeys(), expr, defaultPartName, + result); } @Override @@ -1474,13 +1124,18 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, String defaultPartitionName, short maxParts, List result) throws TException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, result); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); List partNames = new LinkedList<>(); Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, + result); + } boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartitionName, maxParts, partNames, sharedCache); return hasUnknownPartitions; @@ -1497,13 +1152,17 @@ public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); List partNames = new LinkedList<>(); Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); + } getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); return partNames.size(); @@ -1524,10 +1183,15 @@ public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) List partNames) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionsByNames(dbName, tblName, partNames); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionsByNames(dbName, tblName, partNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); List partitions = new ArrayList<>(); for (String partName : partNames) { Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); @@ -1700,14 +1364,18 @@ public Partition getPartitionWithAuth(String dbName, String tblName, throws MetaException, NoSuchObjectException, InvalidObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals); - if (p!=null) { - Table t = sharedCache.getTableFromCache(dbName, tblName); - String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); + if (p != null) { + String partName = Warehouse.makePartName(table.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); p.setPrivileges(privs); @@ -1721,16 +1389,20 @@ public Partition getPartitionWithAuth(String dbName, String tblName, throws MetaException, NoSuchObjectException, InvalidObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); - Table t = sharedCache.getTableFromCache(dbName, tblName); List partitions = new ArrayList<>(); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { if (maxParts == -1 || count < maxParts) { - String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); part.setPrivileges(privs); @@ -1747,13 +1419,17 @@ public Partition getPartitionWithAuth(String dbName, String tblName, throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { + return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); + } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); } - SharedCache sharedCache = sharedCacheWrapper.get(); List partNames = new ArrayList<>(); int count = 0; - Table t = sharedCache.getTableFromCache(dbName, tblName); for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; for (int i=0;i partitions = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; @@ -1803,7 +1484,7 @@ public Partition getPartitionWithAuth(String dbName, String tblName, continue; } if (maxParts == -1 || count < maxParts) { - String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); + String partName = Warehouse.makePartName(table.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); part.setPrivileges(privs); @@ -1823,33 +1504,23 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; + } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return succ; + } List statsObjs = colStats.getStatsObj(); - Table tbl = getTable(dbName, tblName); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } - StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(dbName, tblName, tbl); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update table col stats - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); - } finally { - tableColStatsCacheLock.readLock().unlock(); - } + StatsSetupConst.setColumnStatsState(table.getParameters(), colNames); + sharedCache.alterTableInCache(dbName, tblName, table); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } return succ; } @@ -1859,19 +1530,21 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tblName, List colNames) throws MetaException, NoSuchObjectException { dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); - List colStatObjs = new ArrayList<>(); - for (String colName : colNames) { - String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); - ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); - if (colStat != null) { - colStatObjs.add(colStat); - } + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); + } + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); + List colStatObjs = + sharedCache.getTableColStatsFromCache(dbName, tblName, colNames); if (colStatObjs.isEmpty()) { return null; } else { @@ -1889,16 +1562,11 @@ public boolean deleteTableColumnStatistics(String dbName, String tblName, String if (!shouldCacheTable(dbName, tblName)) { return succ; } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return succ; - try { - // Wait if background cache update is happening - tableColStatsCacheLock.readLock().lock(); - isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); - } finally { - tableColStatsCacheLock.readLock().unlock(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + if (sharedCache == null) { + return succ; } + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } return succ; } @@ -1913,8 +1581,10 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsObjs = colStats.getStatsObj(); Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); @@ -1922,34 +1592,8 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List List colStats; dbName = StringUtils.normalizeIdentifier(dbName); tblName = StringUtils.normalizeIdentifier(tblName); - if (!sharedCacheWrapper.isInitialized() || !shouldCacheTable(dbName, tblName)) { + if (!shouldCacheTable(dbName, tblName)) { rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); + Table table = sharedCache.getTableFromCache(dbName, tblName); + if (table == null) { + // The table is not yet loaded in cache + return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); + } List allPartNames = rawStore.listPartitionNames(dbName, tblName, (short) -1); if (partNames.size() == allPartNames.size()) { colStats = sharedCache.getAggrStatsFromCache(dbName, tblName, colNames, StatsType.ALL); @@ -2044,10 +1679,8 @@ private MergedColumnStatsForPartitions mergeColStatsForPartitions(String dbName, List colStatsWithPartInfoList = new ArrayList(); for (String partName : partNames) { - String colStatsCacheKey = - CacheUtils.buildKey(dbName, tblName, partNameToVals(partName), colName); ColumnStatisticsObj colStatsForPart = - sharedCache.getCachedPartitionColStats(colStatsCacheKey); + sharedCache.getPartitionColStatsFromCache(dbName, tblName, partNameToVals(partName), colName); if (colStatsForPart != null) { ColStatsObjWithSourceInfo colStatsWithPartInfo = new ColStatsObjWithSourceInfo(colStatsForPart, dbName, tblName, partName); @@ -2163,52 +1796,6 @@ public void setMetaStoreSchemaVersion(String version, String comment) } @Override - public void dropPartitions(String dbName, String tblName, List partNames) - throws MetaException, NoSuchObjectException { - rawStore.dropPartitions(dbName, tblName, partNames); - dbName = StringUtils.normalizeIdentifier(dbName); - tblName = StringUtils.normalizeIdentifier(tblName); - if (!shouldCacheTable(dbName, tblName)) { - return; - } - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Remove partitions - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - for (String partName : partNames) { - List vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(dbName, tblName, vals); - } - } finally { - partitionCacheLock.readLock().unlock(); - } - // Remove partition col stats - try { - // Wait if background cache update is happening - partitionColStatsCacheLock.readLock().lock(); - isPartitionColStatsCacheDirty.set(true); - for (String partName : partNames) { - List part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); - } - } finally { - partitionColStatsCacheLock.readLock().unlock(); - } - // Remove aggregate partition col stats for this table - try { - // Wait if background cache update is happening - partitionAggrColStatsCacheLock.readLock().lock(); - isPartitionAggrColStatsCacheDirty.set(true); - sharedCache.removeAggrPartitionColStatsFromCache(dbName, tblName); - } finally { - partitionAggrColStatsCacheLock.readLock().unlock(); - } - } - - @Override public List listPrincipalDBGrantsAll( String principalName, PrincipalType principalType) { return rawStore.listPrincipalDBGrantsAll(principalName, principalType); @@ -2428,7 +2015,7 @@ public int getDatabaseCount() throws MetaException { if (!shouldCacheTable(dbName, tblName)) { return constraintNames; } - SharedCache sharedCache = sharedCacheWrapper.get(); + SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); if (sharedCache == null) return constraintNames; sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); @@ -2470,12 +2057,6 @@ public void dropConstraint(String dbName, String tableName, return rawStore.addNotNullConstraints(nns); } - @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - return rawStore.getPartitionColStatsForDatabase(dbName); - } - public RawStore getRawStore() { return rawStore; } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index 32ea17495f..68f4eb4871 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -25,11 +25,13 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.TreeMap; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.hive.metastore.StatObjectConverter; +import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.AggrStats; +import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; @@ -38,11 +40,7 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.api.TableMeta; -import org.apache.hadoop.hive.metastore.cache.CachedStore.PartitionWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.StorageDescriptorWrapper; -import org.apache.hadoop.hive.metastore.cache.CachedStore.TableWrapper; import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo; import org.apache.hadoop.hive.metastore.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,15 +48,16 @@ import com.google.common.annotations.VisibleForTesting; public class SharedCache { - private Map databaseCache = new TreeMap<>(); - private Map tableCache = new TreeMap<>(); - private Map partitionCache = new TreeMap<>(); - private Map partitionColStatsCache = new TreeMap<>(); - private Map tableColStatsCache = new TreeMap<>(); + private static ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(true); + // For caching Database objects. Key is database name + private Map databaseCache = new HashMap(); + private AtomicBoolean isDatabaseCacheDirty = new AtomicBoolean(false); + // For caching TableWrapper objects. Key is aggregate of database name and table name + private Map tableCache = new HashMap(); + private AtomicBoolean isTableCacheDirty = new AtomicBoolean(false); private Map sdCache = new HashMap<>(); - private Map> aggrColStatsCache = - new HashMap>(); private static MessageDigest md; + static final private Logger LOG = LoggerFactory.getLogger(SharedCache.class.getName()); static enum StatsType { ALL(0), ALLBUTDEFAULT(1); @@ -74,8 +73,6 @@ public int getPosition() { } } - private static final Logger LOG = LoggerFactory.getLogger(SharedCache.class); - static { try { md = MessageDigest.getInstance("MD5"); @@ -84,43 +81,759 @@ public int getPosition() { } } - public synchronized Database getDatabaseFromCache(String name) { - return databaseCache.get(name)!=null?databaseCache.get(name).deepCopy():null; + static class TableWrapper { + Table t; + String location; + Map parameters; + byte[] sdHash; + boolean isLoaded = false; + ReentrantReadWriteLock tableLock = new ReentrantReadWriteLock(true); + // For caching column stats for an unpartitioned table + // Key is column name and the value is the col stat object + private Map tableColStatsCache = + new HashMap(); + private AtomicBoolean isTableColStatsCacheDirty = new AtomicBoolean(false); + // For caching partition objects + // Ket is partition values and the value is a wrapper around the partition object + private Map partitionCache = new HashMap(); + private AtomicBoolean isPartitionCacheDirty = new AtomicBoolean(false); + // For caching column stats for a partitioned table + // Key is aggregate of partition values, column name and the value is the col stat object + private Map partitionColStatsCache = + new HashMap(); + private AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); + // For caching aggregate column stats for all and all minus default partition + // Key is column name and the value is a list of 2 col stat objects + // (all partitions and all but default) + private Map> aggrColStatsCache = + new HashMap>(); + private AtomicBoolean isAggrPartitionColStatsCacheDirty = new AtomicBoolean(false); + + TableWrapper(Table t, byte[] sdHash, String location, Map parameters) { + this.t = t; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + + public Table getTable() { + return t; + } + + public void setTable(Table t) { + this.t = t; + } + + public byte[] getSdHash() { + return sdHash; + } + + public void setSdHash(byte[] sdHash) { + this.sdHash = sdHash; + } + + public String getLocation() { + return location; + } + + public void setLocation(String location) { + this.location = location; + } + + public Map getParameters() { + return parameters; + } + + public void setParameters(Map parameters) { + this.parameters = parameters; + } + + public void markLoaded() { + isLoaded = true; + } + + public boolean isLoaded() { + return isLoaded; + } + + void cachePartition(Partition part, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); + partitionCache.put(CacheUtils.buildKey(part.getValues()), wrapper); + isPartitionCacheDirty.set(true); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + void cachePartitions(List parts, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (Partition part : parts) { + PartitionWrapper wrapper = makePartitionWrapper(part, sharedCache); + partitionCache.put(CacheUtils.buildKey(part.getValues()), wrapper); + isPartitionCacheDirty.set(true); + } + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public Partition getPartition(List partVals, SharedCache sharedCache) { + Partition part = null; + try { + tableLock.readLock().lock(); + PartitionWrapper wrapper = partitionCache.get(CacheUtils.buildKey(partVals)); + if (wrapper == null) { + return null; + } + part = CacheUtils.assemble(wrapper, sharedCache); + } finally { + tableLock.readLock().unlock(); + } + return part; + } + + public List listPartitions(int max, SharedCache sharedCache) { + List parts = new ArrayList<>(); + int count = 0; + try { + tableLock.readLock().lock(); + for (PartitionWrapper wrapper : partitionCache.values()) { + if (max == -1 || count < max) { + parts.add(CacheUtils.assemble(wrapper, sharedCache)); + count++; + } + } + } finally { + tableLock.readLock().unlock(); + } + return parts; + } + + public boolean containsPartition(List partVals) { + boolean containsPart = false; + try { + tableLock.readLock().lock(); + containsPart = partitionCache.containsKey(CacheUtils.buildKey(partVals)); + } finally { + tableLock.readLock().unlock(); + } + return containsPart; + } + + public Partition removePartition(List partVal, SharedCache sharedCache) { + Partition part = null; + try { + tableLock.writeLock().lock(); + PartitionWrapper wrapper = partitionCache.remove(CacheUtils.buildKey(partVal)); + isPartitionCacheDirty.set(true); + if (wrapper.getSdHash() != null) { + sharedCache.decrSd(wrapper.getSdHash()); + } + part = CacheUtils.assemble(wrapper, sharedCache); + // Remove col stats + String partialKey = CacheUtils.buildKey(partVal); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + return part; + } + + public void removePartitions(List> partVals, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (List partVal : partVals) { + removePartition(partVal, sharedCache); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void alterPartition(List partVals, Partition newPart, SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + removePartition(partVals, sharedCache); + cachePartition(newPart, sharedCache); + } finally { + tableLock.writeLock().unlock(); + } + } + + public void alterPartitions(List> partValsList, List newParts, + SharedCache sharedCache) { + try { + tableLock.writeLock().lock(); + for (int i = 0; i < partValsList.size(); i++) { + List partVals = partValsList.get(i); + Partition newPart = newParts.get(i); + alterPartition(partVals, newPart, sharedCache); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void refreshPartitions(List partitions, SharedCache sharedCache) { + Map newPartitionCache = new HashMap(); + try { + tableLock.writeLock().lock(); + for (Partition part : partitions) { + if (isPartitionCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition cache update for table: " + getTable().getTableName() + + "; the partition list we have is dirty."); + return; + } + String key = CacheUtils.buildKey(part.getValues()); + PartitionWrapper wrapper = partitionCache.get(key); + if (wrapper != null) { + if (wrapper.getSdHash() != null) { + sharedCache.decrSd(wrapper.getSdHash()); + } + } + wrapper = makePartitionWrapper(part, sharedCache); + newPartitionCache.put(key, wrapper); + } + partitionCache = newPartitionCache; + } finally { + tableLock.writeLock().unlock(); + } + } + + public void updateTableColStats(List colStatsForTable) { + try { + tableLock.writeLock().lock(); + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + // Get old stats object if present + String key = colStatObj.getColName(); + ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + // TODO: get rid of deepCopy after making sure callers don't use references + tableColStatsCache.put(key, colStatObj.deepCopy()); + } + } + isTableColStatsCacheDirty.set(true); + } finally { + tableLock.writeLock().unlock(); + } + } + + public void refreshTableColStats(List colStatsForTable) { + Map newTableColStatsCache = + new HashMap(); + try { + tableLock.writeLock().lock(); + for (ColumnStatisticsObj colStatObj : colStatsForTable) { + if (isTableColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table col stats cache update for table: " + + getTable().getTableName() + "; the table col stats list we have is dirty."); + return; + } + String key = colStatObj.getColName(); + // TODO: get rid of deepCopy after making sure callers don't use references + newTableColStatsCache.put(key, colStatObj.deepCopy()); + } + tableColStatsCache = newTableColStatsCache; + } finally { + tableLock.writeLock().unlock(); + } + } + + public List getCachedTableColStats(List colNames) { + List colStatObjs = new ArrayList(); + try { + tableLock.readLock().lock(); + for (String colName : colNames) { + ColumnStatisticsObj colStatObj = tableColStatsCache.get(colName); + if (colStatObj != null) { + colStatObjs.add(colStatObj); + } + } + } finally { + tableLock.readLock().unlock(); + } + return colStatObjs; + } + + public void removeTableColStats(String colName) { + try { + tableLock.writeLock().lock(); + tableColStatsCache.remove(colName); + isTableColStatsCacheDirty.set(true); + } finally { + tableLock.writeLock().unlock(); + } + } + + public ColumnStatisticsObj getPartitionColStats(List partVal, String colName) { + try { + tableLock.readLock().lock(); + return partitionColStatsCache.get(CacheUtils.buildKey(partVal, colName)); + } finally { + tableLock.readLock().unlock(); + } + } + + public void updatePartitionColStats(List partVal, + List colStatsObjs) { + try { + tableLock.writeLock().lock(); + addPartitionColStatsToCache(partVal, colStatsObjs); + isPartitionColStatsCacheDirty.set(true); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + public void removePartitionColStats(List partVals, String colName) { + try { + tableLock.writeLock().lock(); + partitionColStatsCache.remove(CacheUtils.buildKey(partVals, colName)); + isPartitionColStatsCacheDirty.set(true); + // Invalidate cached aggregate stats + if (!aggrColStatsCache.isEmpty()) { + aggrColStatsCache.clear(); + } + } finally { + tableLock.writeLock().unlock(); + } + } + + private void addPartitionColStatsToCache(List partVal, + List colStatsObjs) { + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + // Get old stats object if present + String key = CacheUtils.buildKey(partVal, colStatObj.getColName()); + ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); + if (oldStatsObj != null) { + // Update existing stat object's field + StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); + } else { + // No stats exist for this key; add a new object to the cache + // TODO: get rid of deepCopy after making sure callers don't use references + partitionColStatsCache.put(key, colStatObj.deepCopy()); + } + } + } + + public void refreshPartitionColStats(List partitionColStats) { + Map newPartitionColStatsCache = + new HashMap(); + try { + tableLock.writeLock().lock(); + String tableName = StringUtils.normalizeIdentifier(getTable().getTableName()); + for (ColumnStatistics cs : partitionColStats) { + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update for table: " + + getTable().getTableName() + "; the partition column stats list we have is dirty"); + return; + } + List partVal; + try { + partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null); + List colStatsObjs = cs.getStatsObj(); + for (ColumnStatisticsObj colStatObj : colStatsObjs) { + if (isPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping partition column stats cache update for table: " + + getTable().getTableName() + "; the partition column list we have is dirty"); + return; + } + String key = CacheUtils.buildKey(partVal, colStatObj.getColName()); + newPartitionColStatsCache.put(key, colStatObj.deepCopy()); + } + } catch (MetaException e) { + LOG.debug("Unable to cache partition column stats for table: " + tableName, e); + } + } + partitionColStatsCache = newPartitionColStatsCache; + } finally { + tableLock.writeLock().unlock(); + } + } + + public List getAggrPartitionColStats(List colNames, + StatsType statsType) { + List colStats = new ArrayList(); + try { + tableLock.readLock().lock(); + for (String colName : colNames) { + List colStatList = aggrColStatsCache.get(colName); + // If unable to find stats for a column, return null so we can build stats + if (colStatList == null) { + return null; + } + ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); + // If unable to find stats for this StatsType, return null so we can build stats + if (colStatObj == null) { + return null; + } + colStats.add(colStatObj); + } + } finally { + tableLock.readLock().unlock(); + } + return colStats; + } + + public void cacheAggrPartitionColStats(AggrStats aggrStatsAllPartitions, + AggrStats aggrStatsAllButDefaultPartition) { + try { + tableLock.writeLock().lock(); + if (aggrStatsAllPartitions != null) { + for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) { + if (statObj != null) { + List aggrStats = new ArrayList(); + aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy()); + aggrColStatsCache.put(statObj.getColName(), aggrStats); + } + } + } + if (aggrStatsAllButDefaultPartition != null) { + for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) { + if (statObj != null) { + List aggrStats = aggrColStatsCache.get(statObj.getColName()); + if (aggrStats == null) { + aggrStats = new ArrayList(); + } + aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy()); + } + } + } + isAggrPartitionColStatsCacheDirty.set(true); + } finally { + tableLock.writeLock().unlock(); + } + } + + public void refreshAggrPartitionColStats(AggrStats aggrStatsAllPartitions, + AggrStats aggrStatsAllButDefaultPartition) { + Map> newAggrColStatsCache = + new HashMap>(); + try { + tableLock.writeLock().lock(); + if (aggrStatsAllPartitions != null) { + for (ColumnStatisticsObj statObj : aggrStatsAllPartitions.getColStats()) { + if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping aggregate stats cache update for table: " + + getTable().getTableName() + "; the aggregate stats list we have is dirty"); + return; + } + if (statObj != null) { + List aggrStats = new ArrayList(); + aggrStats.add(StatsType.ALL.ordinal(), statObj.deepCopy()); + newAggrColStatsCache.put(statObj.getColName(), aggrStats); + } + } + } + if (aggrStatsAllButDefaultPartition != null) { + for (ColumnStatisticsObj statObj : aggrStatsAllButDefaultPartition.getColStats()) { + if (isAggrPartitionColStatsCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping aggregate stats cache update for table: " + + getTable().getTableName() + "; the aggregate stats list we have is dirty"); + return; + } + if (statObj != null) { + List aggrStats = newAggrColStatsCache.get(statObj.getColName()); + if (aggrStats == null) { + aggrStats = new ArrayList(); + } + aggrStats.add(StatsType.ALLBUTDEFAULT.ordinal(), statObj.deepCopy()); + } + } + } + aggrColStatsCache = newAggrColStatsCache; + } finally { + tableLock.writeLock().unlock(); + } + } + + private void updateTableObj(Table newTable, SharedCache sharedCache) { + byte[] sdHash = getSdHash(); + // Remove old table object's sd hash + if (sdHash != null) { + sharedCache.decrSd(sdHash); + } + Table tblCopy = newTable.deepCopy(); + if (tblCopy.getPartitionKeys() != null) { + for (FieldSchema fs : tblCopy.getPartitionKeys()) { + fs.setName(StringUtils.normalizeIdentifier(fs.getName())); + } + } + setTable(tblCopy); + if (tblCopy.getSd() != null) { + sdHash = MetaStoreUtils.hashStorageDescriptor(tblCopy.getSd(), md); + StorageDescriptor sd = tblCopy.getSd(); + sharedCache.increSd(sd, sdHash); + tblCopy.setSd(null); + setSdHash(sdHash); + setLocation(sd.getLocation()); + setParameters(sd.getParameters()); + } else { + setSdHash(null); + setLocation(null); + setParameters(null); + } + } + + private PartitionWrapper makePartitionWrapper(Partition part, SharedCache sharedCache) { + Partition partCopy = part.deepCopy(); + PartitionWrapper wrapper; + if (part.getSd() != null) { + byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); + StorageDescriptor sd = part.getSd(); + sharedCache.increSd(sd, sdHash); + partCopy.setSd(null); + wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); + } else { + wrapper = new PartitionWrapper(partCopy, null, null, null); + } + return wrapper; + } } - public synchronized void addDatabaseToCache(String dbName, Database db) { - Database dbCopy = db.deepCopy(); - dbCopy.setName(StringUtils.normalizeIdentifier(dbName)); - databaseCache.put(dbName, dbCopy); + static class PartitionWrapper { + Partition p; + String location; + Map parameters; + byte[] sdHash; + + PartitionWrapper(Partition p, byte[] sdHash, String location, Map parameters) { + this.p = p; + this.sdHash = sdHash; + this.location = location; + this.parameters = parameters; + } + + public Partition getPartition() { + return p; + } + + public byte[] getSdHash() { + return sdHash; + } + + public String getLocation() { + return location; + } + + public Map getParameters() { + return parameters; + } } - public synchronized void removeDatabaseFromCache(String dbName) { - databaseCache.remove(dbName); + static class StorageDescriptorWrapper { + StorageDescriptor sd; + int refCount = 0; + + StorageDescriptorWrapper(StorageDescriptor sd, int refCount) { + this.sd = sd; + this.refCount = refCount; + } + + public StorageDescriptor getSd() { + return sd; + } + + public int getRefCount() { + return refCount; + } + } + + public Database getDatabaseFromCache(String name) { + Database db = null; + try { + cacheLock.readLock().lock(); + if (databaseCache.get(name) != null) { + db = databaseCache.get(name).deepCopy(); + } + } finally { + cacheLock.readLock().unlock(); + } + return db; + } + + public void addDatabaseToCache(Database db) { + try { + cacheLock.writeLock().lock(); + Database dbCopy = db.deepCopy(); + databaseCache.put(StringUtils.normalizeIdentifier(dbCopy.getName()), dbCopy); + isDatabaseCacheDirty.set(true); + } finally { + cacheLock.writeLock().unlock(); + } } - public synchronized List listCachedDatabases() { - return new ArrayList<>(databaseCache.keySet()); + public void removeDatabaseFromCache(String dbName) { + try { + cacheLock.writeLock().lock(); + databaseCache.remove(dbName); + isDatabaseCacheDirty.set(true); + } finally { + cacheLock.writeLock().unlock(); + } } - public synchronized void alterDatabaseInCache(String dbName, Database newDb) { - removeDatabaseFromCache(StringUtils.normalizeIdentifier(dbName)); - addDatabaseToCache(StringUtils.normalizeIdentifier(newDb.getName()), newDb.deepCopy()); + public List listCachedDatabases() { + List results = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + results.addAll(databaseCache.keySet()); + } finally { + cacheLock.readLock().unlock(); + } + return results; } - public synchronized int getCachedDatabaseCount() { - return databaseCache.size(); + public List listCachedDatabases(String pattern) { + List results = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (String dbName : databaseCache.keySet()) { + dbName = StringUtils.normalizeIdentifier(dbName); + if (CacheUtils.matches(dbName, pattern)) { + results.add(dbName); + } + } + } finally { + cacheLock.readLock().unlock(); + } + return results; } - public synchronized Table getTableFromCache(String dbName, String tableName) { - TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); - if (tblWrapper == null) { - return null; + public void alterDatabaseInCache(String dbName, Database newDb) { + try { + cacheLock.writeLock().lock(); + removeDatabaseFromCache(dbName); + addDatabaseToCache(newDb.deepCopy()); + isDatabaseCacheDirty.set(true); + } finally { + cacheLock.writeLock().unlock(); + } + } + + public void refreshDatabasesInCache(List databases) { + if (isDatabaseCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping database cache update; the database list we have is dirty."); + return; + } + try { + cacheLock.writeLock().lock(); + databaseCache.clear(); + for (Database db : databases) { + addDatabaseToCache(db.deepCopy()); + } + } finally { + cacheLock.writeLock().unlock(); + } + } + + public int getCachedDatabaseCount() { + try { + cacheLock.readLock().lock(); + return databaseCache.size(); + } finally { + cacheLock.readLock().unlock(); + } + } + + public void populateTableInCache(Table table, ColumnStatistics tableColStats, + List partitions, List partitionColStats, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + String dbName = StringUtils.normalizeIdentifier(table.getDbName()); + String tableName = StringUtils.normalizeIdentifier(table.getTableName()); + addTableToCache(dbName, tableName, table); + if (!table.isSetPartitionKeys()) { + updateTableColStatsInCache(dbName, tableName, tableColStats.getStatsObj()); + } else { + addPartitionsToCache(dbName, tableName, partitions); + for (ColumnStatistics cs : partitionColStats) { + List partVal; + try { + partVal = Warehouse.makeValsFromName(cs.getStatsDesc().getPartName(), null); + List colStats = cs.getStatsObj(); + updatePartitionColStatsInCache(dbName, tableName, partVal, colStats); + } catch (MetaException e) { + LOG.debug("Unable to cache partition column stats for table: " + tableName, e); + } + } + addAggregateStatsToCache(dbName, tableName, aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + // Once table's data is fully loaded in cache, mark it as loaded + markLoaded(dbName, tableName); + } + } + + private void markLoaded(String dbName, String tableName) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if (tblWrapper != null) { + tblWrapper.markLoaded(); + } + } finally { + cacheLock.writeLock().unlock(); + } + } + + public Table getTableFromCache(String dbName, String tableName) { + Table t = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + t = CacheUtils.assemble(tblWrapper, this); + } + } finally { + cacheLock.readLock().unlock(); } - Table t = CacheUtils.assemble(tblWrapper, this); return t; } - public synchronized void addTableToCache(String dbName, String tblName, Table tbl) { + public TableWrapper addTableToCache(String dbName, String tblName, Table tbl) { + try { + cacheLock.writeLock().lock(); + TableWrapper wrapper = createTableWrapper(dbName, tblName, tbl); + tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + isTableCacheDirty.set(true); + return wrapper; + } finally { + cacheLock.writeLock().unlock(); + } + } + + private TableWrapper createTableWrapper(String dbName, String tblName, Table tbl) { + TableWrapper wrapper; Table tblCopy = tbl.deepCopy(); tblCopy.setDbName(StringUtils.normalizeIdentifier(dbName)); tblCopy.setTableName(StringUtils.normalizeIdentifier(tblName)); @@ -129,7 +842,6 @@ public synchronized void addTableToCache(String dbName, String tblName, Table tb fs.setName(StringUtils.normalizeIdentifier(fs.getName())); } } - TableWrapper wrapper; if (tbl.getSd() != null) { byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(tbl.getSd(), md); StorageDescriptor sd = tbl.getSd(); @@ -139,481 +851,448 @@ public synchronized void addTableToCache(String dbName, String tblName, Table tb } else { wrapper = new TableWrapper(tblCopy, null, null, null); } - tableCache.put(CacheUtils.buildKey(dbName, tblName), wrapper); + return wrapper; } - public synchronized void removeTableFromCache(String dbName, String tblName) { - TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); - byte[] sdHash = tblWrapper.getSdHash(); - if (sdHash!=null) { - decrSd(sdHash); + public void removeTableFromCache(String dbName, String tblName) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + byte[] sdHash = tblWrapper.getSdHash(); + if (sdHash != null) { + decrSd(sdHash); + } + isTableCacheDirty.set(true); + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized ColumnStatisticsObj getCachedTableColStats(String colStatsCacheKey) { - return tableColStatsCache.get(colStatsCacheKey)!=null?tableColStatsCache.get(colStatsCacheKey).deepCopy():null; - } - - public synchronized void removeTableColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - tableColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void alterTableInCache(String dbName, String tblName, Table newTable) { + try { + cacheLock.writeLock().lock(); + TableWrapper tblWrapper = tableCache.remove(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.updateTableObj(newTable, this); + String newDbName = StringUtils.normalizeIdentifier(newTable.getDbName()); + String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); + tableCache.put(CacheUtils.buildKey(newDbName, newTblName), tblWrapper); + isTableCacheDirty.set(true); } + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized void removeTableColStatsFromCache(String dbName, String tblName, - String colName) { - if (colName == null) { - removeTableColStatsFromCache(dbName, tblName); - } else { - tableColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, colName)); + public List
listCachedTables(String dbName) { + List
tables = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName) && wrapper.isLoaded()) { + tables.add(CacheUtils.assemble(wrapper, this)); + } + } + } finally { + cacheLock.readLock().unlock(); } + return tables; } - public synchronized void updateTableColStatsInCache(String dbName, String tableName, - List colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = tableColStatsCache.get(key); - if (oldStatsObj != null) { - LOG.debug("CachedStore: updating table column stats for column: " + colStatObj.getColName() - + ", of table: " + tableName + " and database: " + dbName); - // Update existing stat object's field - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - tableColStatsCache.put(key, colStatObj); + public List listCachedTableNames(String dbName) { + List tableNames = new ArrayList<>(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if (wrapper.getTable().getDbName().equals(dbName) && wrapper.isLoaded()) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); + } } + } finally { + cacheLock.readLock().unlock(); } + return tableNames; } - public synchronized void alterTableInCache(String dbName, String tblName, Table newTable) { - removeTableFromCache(dbName, tblName); - addTableToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), - StringUtils.normalizeIdentifier(newTable.getTableName()), newTable); + public List listCachedTableNames(String dbName, String pattern, short maxTables) { + List tableNames = new ArrayList(); + try { + cacheLock.readLock().lock(); + int count = 0; + for (TableWrapper wrapper : tableCache.values()) { + if ((wrapper.getTable().getDbName().equals(dbName)) + && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) + && (maxTables == -1 || count < maxTables) && wrapper.isLoaded()) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); + count++; + } + } + } finally { + cacheLock.readLock().unlock(); + } + return tableNames; } - public synchronized void alterTableInPartitionCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List partitions = listCachedPartitions(dbName, tblName, -1); - for (Partition part : partitions) { - removePartitionFromCache(part.getDbName(), part.getTableName(), part.getValues()); - part.setDbName(StringUtils.normalizeIdentifier(newTable.getDbName())); - part.setTableName(StringUtils.normalizeIdentifier(newTable.getTableName())); - addPartitionToCache(StringUtils.normalizeIdentifier(newTable.getDbName()), - StringUtils.normalizeIdentifier(newTable.getTableName()), part); + public List listCachedTableNames(String dbName, String pattern, TableType tableType) { + List tableNames = new ArrayList(); + try { + cacheLock.readLock().lock(); + for (TableWrapper wrapper : tableCache.values()) { + if ((wrapper.getTable().getDbName().equals(dbName)) + && CacheUtils.matches(wrapper.getTable().getTableName(), pattern) + && wrapper.getTable().getTableType().equals(tableType.toString()) + && wrapper.isLoaded()) { + tableNames.add(StringUtils.normalizeIdentifier(wrapper.getTable().getTableName())); + } } + } finally { + cacheLock.readLock().unlock(); } + return tableNames; } - public synchronized void alterTableInTableColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - String oldPartialTableStatsKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - tableColStatsCache.entrySet().iterator(); - Map newTableColStats = - new HashMap<>(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialTableStatsKey.toLowerCase())) { - String[] decomposedKey = CacheUtils.splitTableColStats(key); - String newKey = CacheUtils.buildKey(decomposedKey[0], decomposedKey[1], decomposedKey[2]); - newTableColStats.put(newKey, colStatObj); - iterator.remove(); + public void refreshTablesInCache(String dbName, List
tables) { + if (isTableCacheDirty.compareAndSet(true, false)) { + LOG.debug("Skipping table cache update; the table list we have is dirty."); + return; + } + try { + cacheLock.writeLock().lock(); + Map newTableCache = new HashMap(); + for (Table tbl : tables) { + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.updateTableObj(tbl, this); + } else { + tblWrapper = createTableWrapper(dbName, tblName, tbl); + tblWrapper.markLoaded(); } + newTableCache.put(CacheUtils.buildKey(dbName, tblName), tblWrapper); } - tableColStatsCache.putAll(newTableColStats); + tableCache.clear(); + tableCache = newTableCache; + } finally { + cacheLock.writeLock().unlock(); } } - public synchronized void alterTableInPartitionColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - List partitions = listCachedPartitions(dbName, tblName, -1); - Map newPartitionColStats = new HashMap<>(); - for (Partition part : partitions) { - String oldPartialPartitionKey = - CacheUtils.buildKeyWithDelimit(dbName, tblName, part.getValues()); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - // New key has the new table name - String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), - (List) decomposedKey[2], (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); - } - } + public List getTableColStatsFromCache(String dbName, String tblName, + List colNames) { + List colStatObjs = new ArrayList(); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + colStatObjs = tblWrapper.getCachedTableColStats(colNames); } - partitionColStatsCache.putAll(newPartitionColStats); + } finally { + cacheLock.readLock().unlock(); } + return colStatObjs; } - public synchronized void alterTableInAggrPartitionColStatsCache(String dbName, String tblName, - Table newTable) { - if (!dbName.equals(newTable.getDbName()) || !tblName.equals(newTable.getTableName())) { - Map> newAggrColStatsCache = - new HashMap>(); - String oldPartialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator>> iterator = - aggrColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - String key = entry.getKey(); - List value = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitAggrColStats(key); - // New key has the new table name - String newKey = CacheUtils.buildKey((String) decomposedKey[0], newTable.getTableName(), - (String) decomposedKey[2]); - newAggrColStatsCache.put(newKey, value); - iterator.remove(); - } + public void removeTableColStatsFromCache(String dbName, String tblName, String colName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.removeTableColStats(colName); } - aggrColStatsCache.putAll(newAggrColStatsCache); + } finally { + cacheLock.readLock().unlock(); } } - public synchronized int getCachedTableCount() { - return tableCache.size(); + public void updateTableColStatsInCache(String dbName, String tableName, + List colStatsForTable) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.updateTableColStats(colStatsForTable); + } + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized List
listCachedTables(String dbName) { - List
tables = new ArrayList<>(); - for (TableWrapper wrapper : tableCache.values()) { - if (wrapper.getTable().getDbName().equals(dbName)) { - tables.add(CacheUtils.assemble(wrapper, this)); + public void refreshTableColStatsInCache(String dbName, String tableName, + List colStatsForTable) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.refreshTableColStats(colStatsForTable); } + } finally { + cacheLock.readLock().unlock(); + } + } + + public int getCachedTableCount() { + try { + cacheLock.readLock().lock(); + return tableCache.size(); + } finally { + cacheLock.readLock().unlock(); } - return tables; } - public synchronized List getTableMeta(String dbNames, String tableNames, List tableTypes) { + public List getTableMeta(String dbNames, String tableNames, + List tableTypes) { List tableMetas = new ArrayList<>(); - for (String dbName : listCachedDatabases()) { - if (CacheUtils.matches(dbName, dbNames)) { - for (Table table : listCachedTables(dbName)) { - if (CacheUtils.matches(table.getTableName(), tableNames)) { - if (tableTypes==null || tableTypes.contains(table.getTableType())) { - TableMeta metaData = new TableMeta( - dbName, table.getTableName(), table.getTableType()); + try { + cacheLock.readLock().lock(); + for (String dbName : listCachedDatabases()) { + if (CacheUtils.matches(dbName, dbNames)) { + for (Table table : listCachedTables(dbName)) { + if (CacheUtils.matches(table.getTableName(), tableNames)) { + if (tableTypes == null || tableTypes.contains(table.getTableType())) { + TableMeta metaData = + new TableMeta(dbName, table.getTableName(), table.getTableType()); metaData.setComments(table.getParameters().get("comment")); tableMetas.add(metaData); + } } } } } + } finally { + cacheLock.readLock().unlock(); } return tableMetas; } - public synchronized void addPartitionToCache(String dbName, String tblName, Partition part) { - Partition partCopy = part.deepCopy(); - PartitionWrapper wrapper; - if (part.getSd()!=null) { - byte[] sdHash = MetaStoreUtils.hashStorageDescriptor(part.getSd(), md); - StorageDescriptor sd = part.getSd(); - increSd(sd, sdHash); - partCopy.setSd(null); - wrapper = new PartitionWrapper(partCopy, sdHash, sd.getLocation(), sd.getParameters()); - } else { - wrapper = new PartitionWrapper(partCopy, null, null, null); + public void addPartitionToCache(String dbName, String tblName, Partition part) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.cachePartition(part, this); + } + } finally { + cacheLock.readLock().unlock(); } - partitionCache.put(CacheUtils.buildKey(dbName, tblName, part.getValues()), wrapper); } - public synchronized Partition getPartitionFromCache(String key) { - PartitionWrapper wrapper = partitionCache.get(key); - if (wrapper == null) { - return null; + public void addPartitionsToCache(String dbName, String tblName, List parts) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.cachePartitions(parts, this); + } + } finally { + cacheLock.readLock().unlock(); } - Partition p = CacheUtils.assemble(wrapper, this); - return p; - } - - public synchronized Partition getPartitionFromCache(String dbName, String tblName, List part_vals) { - return getPartitionFromCache(CacheUtils.buildKey(dbName, tblName, part_vals)); - } - - public synchronized boolean existPartitionFromCache(String dbName, String tblName, List part_vals) { - return partitionCache.containsKey(CacheUtils.buildKey(dbName, tblName, part_vals)); } - public synchronized Partition removePartitionFromCache(String dbName, String tblName, - List part_vals) { - PartitionWrapper wrapper = - partitionCache.remove(CacheUtils.buildKey(dbName, tblName, part_vals)); - if (wrapper.getSdHash() != null) { - decrSd(wrapper.getSdHash()); + public Partition getPartitionFromCache(String dbName, String tblName, + List partVals) { + Partition part = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + part = tblWrapper.getPartition(partVals, this); + } + } finally { + cacheLock.readLock().unlock(); } - return wrapper.getPartition(); + return part; } - /** - * Given a db + table, remove all partitions for this table from the cache - * @param dbName - * @param tblName - * @return - */ - public synchronized void removePartitionsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = partitionCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - PartitionWrapper wrapper = entry.getValue(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); - if (wrapper.getSdHash() != null) { - decrSd(wrapper.getSdHash()); - } + public boolean existPartitionFromCache(String dbName, String tblName, List partVals) { + boolean existsPart = false; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + existsPart = tblWrapper.containsPartition(partVals); } + } finally { + cacheLock.readLock().unlock(); } + return existsPart; } - // Remove cached column stats for all partitions of all tables in a db - public synchronized void removePartitionColStatsFromCache(String dbName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public Partition removePartitionFromCache(String dbName, String tblName, + List partVals) { + Partition part = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + part = tblWrapper.removePartition(partVals, this); } + } finally { + cacheLock.readLock().unlock(); } + return part; } - // Remove cached column stats for all partitions of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void removePartitionsFromCache(String dbName, String tblName, + List> partVals) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.removePartitions(partVals, this); } + } finally { + cacheLock.readLock().unlock(); } } - // Remove cached column stats for a particular partition of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List partVals) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public List listCachedPartitions(String dbName, String tblName, int max) { + List parts = new ArrayList(); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + parts = tblWrapper.listPartitions(max, this); } + } finally { + cacheLock.readLock().unlock(); } + return parts; } - // Remove cached column stats for a particular partition and a particular column of a table - public synchronized void removePartitionColStatsFromCache(String dbName, String tblName, - List partVals, String colName) { - partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); + public void alterPartitionInCache(String dbName, String tblName, List partVals, + Partition newPart) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.alterPartition(partVals, newPart, this); + } + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized List listCachedPartitions(String dbName, String tblName, int max) { - List partitions = new ArrayList<>(); - int count = 0; - for (PartitionWrapper wrapper : partitionCache.values()) { - if (wrapper.getPartition().getDbName().equals(dbName) - && wrapper.getPartition().getTableName().equals(tblName) - && (max == -1 || count < max)) { - partitions.add(CacheUtils.assemble(wrapper, this)); - count++; + public void alterPartitionsInCache(String dbName, String tblName, List> partValsList, + List newParts) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.alterPartitions(partValsList, newParts, this); } + } finally { + cacheLock.readLock().unlock(); } - return partitions; } - public synchronized void alterPartitionInCache(String dbName, String tblName, - List partVals, Partition newPart) { - removePartitionFromCache(dbName, tblName, partVals); - addPartitionToCache(StringUtils.normalizeIdentifier(newPart.getDbName()), - StringUtils.normalizeIdentifier(newPart.getTableName()), newPart); + public void refreshPartitionsInCache(String dbName, String tblName, List partitions) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.refreshPartitions(partitions, this); + } + } finally { + cacheLock.readLock().unlock(); + } } - public synchronized void alterPartitionInColStatsCache(String dbName, String tblName, - List partVals, Partition newPart) { - String oldPartialPartitionKey = CacheUtils.buildKeyWithDelimit(dbName, tblName, partVals); - Map newPartitionColStats = new HashMap<>(); - Iterator> iterator = - partitionColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry entry = iterator.next(); - String key = entry.getKey(); - ColumnStatisticsObj colStatObj = entry.getValue(); - if (key.toLowerCase().startsWith(oldPartialPartitionKey.toLowerCase())) { - Object[] decomposedKey = CacheUtils.splitPartitionColStats(key); - String newKey = - CacheUtils.buildKey(StringUtils.normalizeIdentifier(newPart.getDbName()), - StringUtils.normalizeIdentifier(newPart.getTableName()), newPart.getValues(), - (String) decomposedKey[3]); - newPartitionColStats.put(newKey, colStatObj); - iterator.remove(); + public void removePartitionColStatsFromCache(String dbName, String tblName, + List partVals, String colName) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.removePartitionColStats(partVals, colName); } + } finally { + cacheLock.readLock().unlock(); } - partitionColStatsCache.putAll(newPartitionColStats); } - public synchronized void updatePartitionColStatsInCache(String dbName, String tableName, + public void updatePartitionColStatsInCache(String dbName, String tableName, List partVals, List colStatsObjs) { - for (ColumnStatisticsObj colStatObj : colStatsObjs) { - // Get old stats object if present - String key = CacheUtils.buildKey(dbName, tableName, partVals, colStatObj.getColName()); - ColumnStatisticsObj oldStatsObj = partitionColStatsCache.get(key); - if (oldStatsObj != null) { - // Update existing stat object's field - LOG.debug("CachedStore: updating partition column stats for column: " - + colStatObj.getColName() + ", of table: " + tableName + " and database: " + dbName); - StatObjectConverter.setFieldsIntoOldStats(oldStatsObj, colStatObj); - } else { - // No stats exist for this key; add a new object to the cache - partitionColStatsCache.put(key, colStatObj); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tableName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.updatePartitionColStats(partVals, colStatsObjs); } + } finally { + cacheLock.readLock().unlock(); } } - public synchronized int getCachedPartitionCount() { - return partitionCache.size(); - } - - public synchronized ColumnStatisticsObj getCachedPartitionColStats(String key) { - return partitionColStatsCache.get(key)!=null?partitionColStatsCache.get(key).deepCopy():null; - } - - public synchronized void addPartitionColStatsToCache( - List colStatsForDB) { - for (ColStatsObjWithSourceInfo colStatWithSourceInfo : colStatsForDB) { - List partVals; - try { - partVals = Warehouse.getPartValuesFromPartName(colStatWithSourceInfo.getPartName()); - ColumnStatisticsObj colStatObj = colStatWithSourceInfo.getColStatsObj(); - String key = CacheUtils.buildKey(colStatWithSourceInfo.getDbName(), - colStatWithSourceInfo.getTblName(), partVals, colStatObj.getColName()); - partitionColStatsCache.put(key, colStatObj); - } catch (MetaException e) { - LOG.info("Unable to add partition stats for: {} to SharedCache", - colStatWithSourceInfo.getPartName(), e); + public ColumnStatisticsObj getPartitionColStatsFromCache(String dbName, String tblName, + List partVal, String colName) { + ColumnStatisticsObj colStatObj = null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + colStatObj = tblWrapper.getPartitionColStats(partVal, colName); } + } finally { + cacheLock.readLock().unlock(); } - - } - - public synchronized void refreshPartitionColStats(String dbName, - List colStatsForDB) { - LOG.debug("CachedStore: updating cached partition column stats objects for database: {}", - dbName); - removePartitionColStatsFromCache(dbName); - addPartitionColStatsToCache(colStatsForDB); + return colStatObj; } - public synchronized void addAggregateStatsToCache(String dbName, String tblName, - AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { - if (aggrStatsAllPartitions != null) { - for (ColumnStatisticsObj colStatObj : aggrStatsAllPartitions.getColStats()) { - String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); - List value = new ArrayList(); - value.add(StatsType.ALL.getPosition(), colStatObj); - aggrColStatsCache.put(key, value); - } - } - if (aggrStatsAllButDefaultPartition != null) { - for (ColumnStatisticsObj colStatObj : aggrStatsAllButDefaultPartition.getColStats()) { - String key = CacheUtils.buildKey(dbName, tblName, colStatObj.getColName()); - List value = aggrColStatsCache.get(key); - if ((value != null) && (value.size() > 0)) { - value.add(StatsType.ALLBUTDEFAULT.getPosition(), colStatObj); - } + public void refreshPartitionColStatsInCache(String dbName, String tblName, + List partitionColStats) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.refreshPartitionColStats(partitionColStats); } + } finally { + cacheLock.readLock().unlock(); } } public List getAggrStatsFromCache(String dbName, String tblName, List colNames, StatsType statsType) { - List colStats = new ArrayList(); - for (String colName : colNames) { - String key = CacheUtils.buildKey(dbName, tblName, colName); - List colStatList = aggrColStatsCache.get(key); - // If unable to find stats for a column, return null so we can build stats - if (colStatList == null) { - return null; - } - ColumnStatisticsObj colStatObj = colStatList.get(statsType.getPosition()); - // If unable to find stats for this StatsType, return null so we can build - // stats - if (colStatObj == null) { - return null; + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + return tblWrapper.getAggrPartitionColStats(colNames, statsType); } - colStats.add(colStatObj); + } finally { + cacheLock.readLock().unlock(); } - return colStats; + return null; } - public synchronized void removeAggrPartitionColStatsFromCache(String dbName, String tblName) { - String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); - Iterator>> iterator = - aggrColStatsCache.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - String key = entry.getKey(); - if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { - iterator.remove(); + public void addAggregateStatsToCache(String dbName, String tblName, + AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.cacheAggrPartitionColStats(aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); } + } finally { + cacheLock.readLock().unlock(); } } - public synchronized void refreshAggregateStatsCache(String dbName, String tblName, + public void refreshAggregateStatsInCache(String dbName, String tblName, AggrStats aggrStatsAllPartitions, AggrStats aggrStatsAllButDefaultPartition) { - LOG.debug("CachedStore: updating aggregate stats cache for database: {}, table: {}", dbName, - tblName); - removeAggrPartitionColStatsFromCache(dbName, tblName); - addAggregateStatsToCache(dbName, tblName, aggrStatsAllPartitions, - aggrStatsAllButDefaultPartition); - } - - public synchronized void addTableColStatsToCache(String dbName, String tableName, - List colStatsForTable) { - for (ColumnStatisticsObj colStatObj : colStatsForTable) { - String key = CacheUtils.buildKey(dbName, tableName, colStatObj.getColName()); - tableColStatsCache.put(key, colStatObj); + try { + cacheLock.readLock().lock(); + TableWrapper tblWrapper = tableCache.get(CacheUtils.buildKey(dbName, tblName)); + if ((tblWrapper != null) && (tblWrapper.isLoaded())) { + tblWrapper.refreshAggrPartitionColStats(aggrStatsAllPartitions, + aggrStatsAllButDefaultPartition); + } + } finally { + cacheLock.readLock().unlock(); } } - public synchronized void refreshTableColStats(String dbName, String tableName, - List colStatsForTable) { - LOG.debug("CachedStore: updating cached table column stats objects for database: " + dbName - + " and table: " + tableName); - // Remove all old cache entries for this table - removeTableColStatsFromCache(dbName, tableName); - // Add new entries to cache - addTableColStatsToCache(dbName, tableName, colStatsForTable); - } - public void increSd(StorageDescriptor sd, byte[] sdHash) { ByteArrayWrapper byteArray = new ByteArrayWrapper(sdHash); if (sdCache.containsKey(byteArray)) { @@ -640,45 +1319,6 @@ public StorageDescriptor getSdFromCache(byte[] sdHash) { return sdWrapper.getSd(); } - // Replace databases in databaseCache with the new list - public synchronized void refreshDatabases(List databases) { - LOG.debug("CachedStore: updating cached database objects"); - for (String dbName : listCachedDatabases()) { - removeDatabaseFromCache(dbName); - } - for (Database db : databases) { - addDatabaseToCache(db.getName(), db); - } - } - - // Replace tables in tableCache with the new list - public synchronized void refreshTables(String dbName, List
tables) { - LOG.debug("CachedStore: updating cached table objects for database: " + dbName); - for (Table tbl : listCachedTables(dbName)) { - removeTableFromCache(dbName, tbl.getTableName()); - } - for (Table tbl : tables) { - addTableToCache(dbName, tbl.getTableName(), tbl); - } - } - - public synchronized void refreshPartitions(String dbName, String tblName, - List partitions) { - LOG.debug("CachedStore: updating cached partition objects for database: " + dbName - + " and table: " + tblName); - Iterator> iterator = partitionCache.entrySet().iterator(); - while (iterator.hasNext()) { - PartitionWrapper partitionWrapper = iterator.next().getValue(); - if (partitionWrapper.getPartition().getDbName().equals(dbName) - && partitionWrapper.getPartition().getTableName().equals(tblName)) { - iterator.remove(); - } - } - for (Partition part : partitions) { - addPartitionToCache(dbName, tblName, part); - } - } - @VisibleForTesting Map getDatabaseCache() { return databaseCache; @@ -690,17 +1330,7 @@ public synchronized void refreshPartitions(String dbName, String tblName, } @VisibleForTesting - Map getPartitionCache() { - return partitionCache; - } - - @VisibleForTesting Map getSdCache() { return sdCache; } - - @VisibleForTesting - Map getPartitionColStatsCache() { - return partitionColStatsCache; - } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java index 9100c73beb..d7cabd9497 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java @@ -1053,11 +1053,4 @@ public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerNa String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { objectStore.dropWMTriggerToPoolMapping(resourcePlanName, triggerName, poolPath); } - - @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; - } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java index 86e72d8d76..aaf605d454 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java @@ -1050,11 +1050,4 @@ public void createWMTriggerToPoolMapping(String resourcePlanName, String trigger public void dropWMTriggerToPoolMapping(String resourcePlanName, String triggerName, String poolPath) throws NoSuchObjectException, InvalidOperationException, MetaException { } - - @Override - public List getPartitionColStatsForDatabase(String dbName) - throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; - } } diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java index bd61df654a..9a538820f4 100644 --- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java +++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/cache/TestCachedStore.java @@ -28,9 +28,7 @@ import org.apache.hadoop.hive.metastore.MetaStoreTestUtils; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.TestObjectStore.MockPartitionExpressionProxy; import org.apache.hadoop.hive.metastore.api.AggrStats; -import org.apache.hadoop.hive.metastore.api.BasicTxnInfo; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData; @@ -75,9 +73,6 @@ public void setUp() throws Exception { sharedCache = new SharedCache(); sharedCache.getDatabaseCache().clear(); sharedCache.getTableCache().clear(); - sharedCache.getPartitionCache().clear(); - sharedCache.getSdCache().clear(); - sharedCache.getPartitionColStatsCache().clear(); } /********************************************************************************************** @@ -508,9 +503,9 @@ public void testSharedStoreDb() { Database newDb1 = new Database(); newDb1.setName("db1"); - sharedCache.addDatabaseToCache("db1", db1); - sharedCache.addDatabaseToCache("db2", db2); - sharedCache.addDatabaseToCache("db3", db3); + sharedCache.addDatabaseToCache(db1); + sharedCache.addDatabaseToCache(db2); + sharedCache.addDatabaseToCache(db3); Assert.assertEquals(sharedCache.getCachedDatabaseCount(), 3); @@ -662,22 +657,18 @@ public void testSharedStorePartition() { sharedCache.addPartitionToCache("db1", "tbl1", part3); sharedCache.addPartitionToCache("db1", "tbl2", part1); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 4); Assert.assertEquals(sharedCache.getSdCache().size(), 2); Partition t = sharedCache.getPartitionFromCache("db1", "tbl1", Arrays.asList("201701")); Assert.assertEquals(t.getSd().getLocation(), "loc1"); sharedCache.removePartitionFromCache("db1", "tbl2", Arrays.asList("201701")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); Assert.assertEquals(sharedCache.getSdCache().size(), 2); sharedCache.alterPartitionInCache("db1", "tbl1", Arrays.asList("201701"), newPart1); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 3); Assert.assertEquals(sharedCache.getSdCache().size(), 3); sharedCache.removePartitionFromCache("db1", "tbl1", Arrays.asList("201702")); - Assert.assertEquals(sharedCache.getCachedPartitionCount(), 2); Assert.assertEquals(sharedCache.getSdCache().size(), 2); } @@ -752,10 +743,10 @@ public void testPartitionAggrStats() throws Exception { String dbName = "testTableColStatsOps1"; String tblName = "tbl1"; String colName = "f1"; - + Database db = new Database(dbName, null, "some_location", null); cachedStore.createDatabase(db); - + List cols = new ArrayList<>(); cols.add(new FieldSchema(colName, "int", null)); List partCols = new ArrayList<>(); @@ -763,29 +754,29 @@ public void testPartitionAggrStats() throws Exception { StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, TableType.MANAGED_TABLE.toString()); cachedStore.createTable(tbl); - + List partVals1 = new ArrayList<>(); partVals1.add("1"); List partVals2 = new ArrayList<>(); partVals2.add("2"); - + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn1); Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn2); - + ColumnStatistics stats = new ColumnStatistics(); ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); statsDesc.setPartName("col"); List colStatObjs = new ArrayList<>(); - + ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); @@ -795,15 +786,15 @@ public void testPartitionAggrStats() throws Exception { longStats.setNumDVs(30); data.setLongStats(longStats); colStatObjs.add(colStats); - + stats.setStatsDesc(statsDesc); stats.setStatsObj(colStatObjs); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - + longStats.setNumDVs(40); cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - + List colNames = new ArrayList<>(); colNames.add(colName); List aggrPartVals = new ArrayList<>(); @@ -822,10 +813,10 @@ public void testPartitionAggrStatsBitVector() throws Exception { String dbName = "testTableColStatsOps2"; String tblName = "tbl2"; String colName = "f1"; - + Database db = new Database(dbName, null, "some_location", null); cachedStore.createDatabase(db); - + List cols = new ArrayList<>(); cols.add(new FieldSchema(colName, "int", null)); List partCols = new ArrayList<>(); @@ -833,29 +824,29 @@ public void testPartitionAggrStatsBitVector() throws Exception { StorageDescriptor sd = new StorageDescriptor(cols, null, "input", "output", false, 0, new SerDeInfo("serde", "seriallib", new HashMap<>()), null, null, null); - + Table tbl = new Table(tblName, dbName, null, 0, 0, 0, sd, partCols, new HashMap<>(), null, null, TableType.MANAGED_TABLE.toString()); cachedStore.createTable(tbl); - + List partVals1 = new ArrayList<>(); partVals1.add("1"); List partVals2 = new ArrayList<>(); partVals2.add("2"); - + Partition ptn1 = new Partition(partVals1, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn1); Partition ptn2 = new Partition(partVals2, dbName, tblName, 0, 0, sd, new HashMap<>()); cachedStore.addPartition(ptn2); - + ColumnStatistics stats = new ColumnStatistics(); ColumnStatisticsDesc statsDesc = new ColumnStatisticsDesc(true, dbName, tblName); statsDesc.setPartName("col"); List colStatObjs = new ArrayList<>(); - + ColumnStatisticsData data = new ColumnStatisticsData(); ColumnStatisticsObj colStats = new ColumnStatisticsObj(colName, "int", data); LongColumnStatsDataInspector longStats = new LongColumnStatsDataInspector(); @@ -863,21 +854,21 @@ public void testPartitionAggrStatsBitVector() throws Exception { longStats.setHighValue(100); longStats.setNumNulls(50); longStats.setNumDVs(30); - + HyperLogLog hll = HyperLogLog.builder().build(); hll.addLong(1); hll.addLong(2); hll.addLong(3); longStats.setBitVectors(hll.serialize()); - + data.setLongStats(longStats); colStatObjs.add(colStats); - + stats.setStatsDesc(statsDesc); stats.setStatsObj(colStatObjs); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals1); - + longStats.setNumDVs(40); hll = HyperLogLog.builder().build(); hll.addLong(2); @@ -885,9 +876,9 @@ public void testPartitionAggrStatsBitVector() throws Exception { hll.addLong(4); hll.addLong(5); longStats.setBitVectors(hll.serialize()); - + cachedStore.updatePartitionColumnStatistics(stats.deepCopy(), partVals2); - + List colNames = new ArrayList<>(); colNames.add(colName); List aggrPartVals = new ArrayList<>();