diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 26b1b18b47..2cbdaf12ba 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -900,6 +900,17 @@ private static void populateLlapDaemonVarsSet(Set llapDaemonVarsSetLocal "hive.metastore.cached.rawstore.cache.update.frequency", "60", new TimeValidator( TimeUnit.SECONDS), "The time after which metastore cache is updated from metastore DB."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST( + "hive.metastore.cached.rawstore.cached.object.whitelist", "*", "Comma separated list of regular expressions \n " + + "to select the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.blacklist. \n" + + "Example: .*, db1.*, db2\\.tbl.*. The last item can potentially override patterns specified before."), + METASTORE_CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST( + "hive.metastore.cached.rawstore.cached.object.blacklist", "", "Comma separated list of regular expressions \n " + + "to filter out the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.whitelist. \n" + + "Example: db2.*, db3\\.tbl1, db3\\..*. The last item can potentially override patterns specified before. \n" + + "The blacklist also overrides the whitelist."), METASTORE_TXN_STORE_IMPL("hive.metastore.txn.store.impl", "org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler", "Name of class that implements org.apache.hadoop.hive.metastore.txn.TxnStore. This " + diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java index c61f27b326..d67b72428b 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -31,6 +33,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; @@ -124,6 +128,8 @@ private static ReentrantReadWriteLock partitionColStatsCacheLock = new ReentrantReadWriteLock( true); private static AtomicBoolean isPartitionColStatsCacheDirty = new AtomicBoolean(false); + private static List whitelistPatterns = new ArrayList(); + private static List blacklistPatterns = new ArrayList(); private RawStore rawStore = null; private Configuration conf; private PartitionExpressionProxy expressionProxy = null; @@ -247,21 +253,30 @@ static void prewarm(RawStore rawStore) throws Exception { // Prevents throwing exceptions in our raw store calls since we're not using RawStoreProxy Deadline.registerIfNot(1000000); List dbNames = rawStore.getAllDatabases(); + LOG.info("Number of databases to prewarm: " + dbNames.size()); SharedCache sharedCache = sharedCacheWrapper.getUnsafe(); - for (String dbName : dbNames) { + for (int i = 0; i < dbNames.size(); i++) { + String dbName = StringUtils.normalizeIdentifier(dbNames.get(i)); + LOG.info("Caching database: {}. Cached {} / {} databases so far.", dbName, i, dbNames.size()); Database db = rawStore.getDatabase(dbName); - sharedCache.addDatabaseToCache(StringUtils.normalizeIdentifier(dbName), db); + sharedCache.addDatabaseToCache(dbName, db); List tblNames = rawStore.getAllTables(dbName); - for (String tblName : tblNames) { + LOG.debug("Tables in database: {} : {}", dbName, tblNames); + for (int j = 0; j < tblNames.size(); j++) { + String tblName = StringUtils.normalizeIdentifier(tblNames.get(j)); + if (!shouldCacheTable(dbName, tblName)) { + LOG.info("Not caching database: {}'s table: {}", dbName, tblName); + continue; + } + LOG.info("Caching database: {}'s table: {}. Cached {} / {} tables so far.", dbName, + tblName, j, tblNames.size()); Table table = rawStore.getTable(dbName, tblName); - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), table); + sharedCache.addTableToCache(dbName, tblName, table); Deadline.startTimer("getPartitions"); List partitions = rawStore.getPartitions(dbName, tblName, Integer.MAX_VALUE); Deadline.stopTimer(); for (Partition partition : partitions) { - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partition); + sharedCache.addPartitionToCache(dbName, tblName, partition); } // Cache partition column stats Deadline.startTimer("getColStatsForTablePartitions"); @@ -278,11 +293,12 @@ static void prewarm(RawStore rawStore) throws Exception { rawStore.getTableColumnStatistics(dbName, tblName, colNames); Deadline.stopTimer(); if ((tableColStats != null) && (tableColStats.getStatsObjSize() > 0)) { - sharedCache.addTableColStatsToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), tableColStats.getStatsObj()); + sharedCache.addTableColStatsToCache(dbName, tblName, tableColStats.getStatsObj()); } } } + // Notify all blocked threads that prewarm is complete now + sharedCacheWrapper.notifyAllBlocked(); } @VisibleForTesting @@ -297,6 +313,12 @@ public Thread newThread(Runnable r) { return t; } }); + whitelistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST)); + blacklistPatterns = createPatterns(MetastoreConf.getAsString(conf, + MetastoreConf.ConfVars.CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST)); + // The last specified blacklist pattern gets precedence + Collections.reverse(blacklistPatterns); if (!MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST)) { cacheRefreshPeriod = MetastoreConf.getTimeVar(conf, ConfVars.CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY, @@ -354,9 +376,12 @@ public void run() { if (isFirstRun) { while (isFirstRun) { try { + long startTime = System.nanoTime(); LOG.info("Prewarming CachedStore"); prewarm(rawStore); LOG.info("CachedStore initialized"); + long endTime = System.nanoTime(); + LOG.info("Time taken in prewarming = " + (endTime - startTime) / 1000000 + "ms"); } catch (Exception e) { LOG.error("Prewarm failure", e); sharedCacheWrapper.updateInitState(e, false); @@ -384,6 +409,9 @@ public void update() { updateTables(rawStore, dbName); List tblNames = getAllTablesInternal(dbName, sharedCacheWrapper.getUnsafe()); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } // Update the partitions for a table in cache updateTablePartitions(rawStore, dbName, tblName); // Update the table column stats for a table in cache @@ -433,6 +461,9 @@ private void updateTables(RawStore rawStore, String dbName) { try { List tblNames = rawStore.getAllTables(dbName); for (String tblName : tblNames) { + if (!shouldCacheTable(dbName, tblName)) { + continue; + } Table table = rawStore.getTable(StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName)); @@ -576,7 +607,9 @@ public void rollbackTransaction() { public void createDatabase(Database db) throws InvalidObjectException, MetaException { rawStore.createDatabase(db); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000)) { + return; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -593,7 +626,7 @@ public Database getDatabase(String dbName) throws NoSuchObjectException { SharedCache sharedCache; try { sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getDatabase(dbName); } } catch (MetaException e) { @@ -611,7 +644,9 @@ public boolean dropDatabase(String dbname) throws NoSuchObjectException, MetaExc boolean succ = rawStore.dropDatabase(dbname); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000)) { + return succ; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -630,7 +665,9 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce boolean succ = rawStore.alterDatabase(dbName, db); if (succ) { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000)) { + return succ; + } try { // Wait if background cache update is happening databaseCacheLock.readLock().lock(); @@ -646,7 +683,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce @Override public List getDatabases(String pattern) throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getDatabases(pattern); } List results = new ArrayList<>(); @@ -662,7 +699,7 @@ public boolean alterDatabase(String dbName, Database db) throws NoSuchObjectExce @Override public List getAllDatabases() throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getAllDatabases(); } return sharedCache.listCachedDatabases(); @@ -704,34 +741,42 @@ private void validateTableType(Table tbl) { @Override public void createTable(Table tbl) throws InvalidObjectException, MetaException { rawStore.createTable(tbl); - validateTableType(tbl); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !shouldCacheTable(dbName, tblName) + || !sharedCacheWrapper.waitForInit(10000)) { + return; + } + validateTableType(tbl); try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), - StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); + sharedCache.addTableToCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } } @Override - public boolean dropTable(String dbName, String tableName) throws MetaException, + public boolean dropTable(String dbName, String tblName) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropTable(dbName, tableName); + boolean succ = rawStore.dropTable(dbName, tblName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } // Remove table try { // Wait if background table cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.removeTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableFromCache(dbName, tblName); } finally { tableCacheLock.readLock().unlock(); } @@ -740,8 +785,7 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, // Wait if background table col stats cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + sharedCache.removeTableColStatsFromCache(dbName, tblName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -750,13 +794,15 @@ public boolean dropTable(String dbName, String tableName) throws MetaException, } @Override - public Table getTable(String dbName, String tableName) throws MetaException { + public Table getTable(String dbName, String tblName) throws MetaException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTable(dbName, tableName); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getTable(dbName, tblName); } - Table tbl = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName)); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); if (tbl != null) { tbl.unsetPrivileges(); tbl.setRewriteEnabled(tbl.isRewriteEnabled()); @@ -768,14 +814,18 @@ public Table getTable(String dbName, String tableName) throws MetaException { public boolean addPartition(Partition part) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartition(part); if (succ) { + String dbName = StringUtils.normalizeIdentifier(part.getDbName()); + String tblName = StringUtils.normalizeIdentifier(part.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(part.getDbName()), - StringUtils.normalizeIdentifier(part.getTableName()), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } finally { partitionCacheLock.readLock().unlock(); } @@ -788,15 +838,19 @@ public boolean addPartitions(String dbName, String tblName, List part throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, parts); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); for (Partition part : parts) { - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -810,8 +864,13 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p boolean ifNotExists) throws InvalidObjectException, MetaException { boolean succ = rawStore.addPartitions(dbName, tblName, partitionSpec, ifNotExists); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -819,8 +878,7 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p PartitionSpecProxy.PartitionIterator iterator = partitionSpec.getPartitionIterator(); while (iterator.hasNext()) { Partition part = iterator.next(); - sharedCache.addPartitionToCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part); + sharedCache.addPartitionToCache(dbName, tblName, part); } } finally { partitionCacheLock.readLock().unlock(); @@ -830,15 +888,17 @@ public boolean addPartitions(String dbName, String tblName, PartitionSpecProxy p } @Override - public Partition getPartition(String dbName, String tableName, List part_vals) + public Partition getPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartition(dbName, tableName, part_vals); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartition(dbName, tblName, part_vals); } Partition part = - sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.getPartitionFromCache(dbName, tblName, part_vals); if (part != null) { part.unsetPrivileges(); } else { @@ -848,30 +908,36 @@ public Partition getPartition(String dbName, String tableName, List part } @Override - public boolean doesPartitionExist(String dbName, String tableName, + public boolean doesPartitionExist(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.doesPartitionExist(dbName, tableName, part_vals); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.doesPartitionExist(dbName, tblName, part_vals); } - return sharedCache.existPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + return sharedCache.existPartitionFromCache(dbName, tblName, part_vals); } @Override - public boolean dropPartition(String dbName, String tableName, List part_vals) + public boolean dropPartition(String dbName, String tblName, List part_vals) throws MetaException, NoSuchObjectException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.dropPartition(dbName, tableName, part_vals); + boolean succ = rawStore.dropPartition(dbName, tblName, part_vals); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } // Remove partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionFromCache(dbName, tblName, part_vals); } finally { partitionCacheLock.readLock().unlock(); } @@ -880,8 +946,7 @@ public boolean dropPartition(String dbName, String tableName, List part_ // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -890,14 +955,16 @@ public boolean dropPartition(String dbName, String tableName, List part_ } @Override - public List getPartitions(String dbName, String tableName, int max) + public List getPartitions(String dbName, String tblName, int max) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitions(dbName, tableName, max); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartitions(dbName, tblName, max); } - List parts = sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), max); + List parts = sharedCache.listCachedPartitions(dbName, tblName, max); if (parts != null) { for (Partition part : parts) { part.unsetPrivileges(); @@ -910,87 +977,112 @@ public boolean dropPartition(String dbName, String tableName, List part_ public void alterTable(String dbName, String tblName, Table newTable) throws InvalidObjectException, MetaException { rawStore.alterTable(dbName, tblName, newTable); - validateTableType(newTable); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); + String newTblName = StringUtils.normalizeIdentifier(newTable.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; - // Update table cache - try { - // Wait if background cache update is happening - tableCacheLock.readLock().lock(); - isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - tableCacheLock.readLock().unlock(); - } - // Update partition cache (key might have changed since table name is a - // component of key) - try { - // Wait if background cache update is happening - partitionCacheLock.readLock().lock(); - isPartitionCacheDirty.set(true); - sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), newTable); - } finally { - partitionCacheLock.readLock().unlock(); + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return; + } else { + if (shouldCacheTable(dbName, newTblName)) { + validateTableType(newTable); + // Update table cache + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + tableCacheLock.readLock().unlock(); + } + // Update partition cache (key might have changed since table name is a + // component of key) + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.alterTableInPartitionCache(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), newTable); + } finally { + partitionCacheLock.readLock().unlock(); + } + } else { + // Remove the table and its cached partitions, stats etc, + // since it does not pass the whitelist/blacklist filter. + // Remove table + try { + // Wait if background cache update is happening + tableCacheLock.readLock().lock(); + isTableCacheDirty.set(true); + sharedCache.removeTableFromCache(dbName, tblName); + } finally { + tableCacheLock.readLock().unlock(); + } + // Remove partitions + try { + // Wait if background cache update is happening + partitionCacheLock.readLock().lock(); + isPartitionCacheDirty.set(true); + sharedCache.removePartitionsFromCache(dbName, tblName); + } finally { + partitionCacheLock.readLock().unlock(); + } + // Remove partition col stats + try { + // Wait if background cache update is happening + partitionColStatsCacheLock.readLock().lock(); + isPartitionColStatsCacheDirty.set(true); + sharedCache.removePartitionColStatsFromCache(dbName, tblName); + } finally { + partitionColStatsCacheLock.readLock().unlock(); + } + } } } @Override - public List getTables(String dbName, String pattern) - throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTables(dbName, pattern); - } - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern)) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + public List getTables(String dbName, String pattern) throws MetaException { + // We can't return table names from cache since it may not have all tables cached + return rawStore.getTables(dbName, pattern); } @Override - public List getTables(String dbName, String pattern, - TableType tableType) throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTables(dbName, pattern); - } - List tableNames = new ArrayList<>(); - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), pattern) && - table.getTableType().equals(tableType.toString())) { - tableNames.add(table.getTableName()); - } - } - return tableNames; + public List getTables(String dbName, String pattern, TableType tableType) + throws MetaException { + // We can't return table names from cache since it may not have all tables cached + return rawStore.getTables(dbName, pattern); } @Override - public List getTableMeta(String dbNames, String tableNames, - List tableTypes) throws MetaException { + public List getTableMeta(String dbNames, String tableNames, List tableTypes) + throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableMeta(dbNames, tableNames, tableTypes); + if (!(sharedCache == null) && whitelistPatterns.isEmpty() && blacklistPatterns.isEmpty() + && sharedCacheWrapper.isInitialized()) { + return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), + StringUtils.normalizeIdentifier(tableNames), tableTypes); } - return sharedCache.getTableMeta(StringUtils.normalizeIdentifier(dbNames), - StringUtils.normalizeIdentifier(tableNames), tableTypes); + return rawStore.getTableMeta(dbNames, tableNames, tableTypes); } @Override - public List getTableObjectsByName(String dbName, - List tblNames) throws MetaException, UnknownDBException { + public List
getTableObjectsByName(String dbName, List tblNames) + throws MetaException, UnknownDBException { + dbName = StringUtils.normalizeIdentifier(dbName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized()) { return rawStore.getTableObjectsByName(dbName, tblNames); } List
tables = new ArrayList<>(); for (String tblName : tblNames) { - tables.add(sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName))); + tblName = StringUtils.normalizeIdentifier(tblName); + Table tbl = sharedCache.getTableFromCache(dbName, tblName); + if (tbl == null) { + tbl = rawStore.getTable(dbName, tblName); + } + tables.add(tbl); } return tables; } @@ -998,10 +1090,12 @@ public void alterTable(String dbName, String tblName, Table newTable) @Override public List getAllTables(String dbName) throws MetaException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getAllTables(dbName); + if (!(sharedCache == null) && whitelistPatterns.isEmpty() && blacklistPatterns.isEmpty() + && sharedCacheWrapper.isInitialized()) { + return getAllTablesInternal(dbName, sharedCache); } - return getAllTablesInternal(dbName, sharedCache); + // We can't return table names from cache since it may not have all tables cached + return rawStore.getAllTables(dbName); } private static List getAllTablesInternal(String dbName, SharedCache sharedCache) { @@ -1013,37 +1107,39 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public List listTableNamesByFilter(String dbName, String filter, - short max_tables) throws MetaException, UnknownDBException { + public List listTableNamesByFilter(String dbName, String filter, short max_tables) + throws MetaException, UnknownDBException { SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.listTableNamesByFilter(dbName, filter, max_tables); - } - List tableNames = new ArrayList<>(); - int count = 0; - for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { - if (CacheUtils.matches(table.getTableName(), filter) - && (max_tables == -1 || count < max_tables)) { - tableNames.add(table.getTableName()); - count++; + if (!(sharedCache == null) && whitelistPatterns.isEmpty() && blacklistPatterns.isEmpty() + && sharedCacheWrapper.isInitialized()) { + List tableNames = new ArrayList<>(); + int count = 0; + for (Table table : sharedCache.listCachedTables(StringUtils.normalizeIdentifier(dbName))) { + if (CacheUtils.matches(table.getTableName(), filter) + && (max_tables == -1 || count < max_tables)) { + tableNames.add(table.getTableName()); + count++; + } } + return tableNames; } - return tableNames; + return rawStore.listTableNamesByFilter(dbName, filter, max_tables); } @Override public List listPartitionNames(String dbName, String tblName, short max_parts) throws MetaException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.listPartitionNames(dbName, tblName, max_parts); } List partitionNames = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), max_parts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, max_parts)) { if (max_parts == -1 || count < max_parts) { partitionNames.add(Warehouse.makePartName(t.getPartitionKeys(), part.getValues())); } @@ -1052,32 +1148,36 @@ public void alterTable(String dbName, String tblName, Table newTable) } @Override - public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, List cols, - boolean applyDistinct, String filter, boolean ascending, - List order, long maxParts) throws MetaException { + public PartitionValuesResponse listPartitionValues(String db_name, String tbl_name, + List cols, boolean applyDistinct, String filter, boolean ascending, + List order, long maxParts) throws MetaException { throw new UnsupportedOperationException(); } @Override - public List listPartitionNamesByFilter(String db_name, - String tbl_name, String filter, short max_parts) throws MetaException { + public List listPartitionNamesByFilter(String dbName, + String tblName, String filter, short max_parts) throws MetaException { // TODO Translate filter -> expr - return null; + return rawStore.listPartitionNamesByFilter(dbName, tblName, filter, max_parts); } @Override public void alterPartition(String dbName, String tblName, List partVals, Partition newPart) throws InvalidObjectException, MetaException { rawStore.alterPartition(dbName, tblName, partVals, newPart); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return; + } + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } finally { partitionCacheLock.readLock().unlock(); } @@ -1086,8 +1186,7 @@ public void alterPartition(String dbName, String tblName, List partVals, // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1097,9 +1196,14 @@ public void alterPartition(String dbName, String tblName, List partVals, public void alterPartitions(String dbName, String tblName, List> partValsList, List newParts) throws InvalidObjectException, MetaException { rawStore.alterPartitions(dbName, tblName, partValsList, newParts); - // Update partition cache + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return; + } + // Update partition cache try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); @@ -1107,8 +1211,7 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, newPart); } } finally { partitionCacheLock.readLock().unlock(); @@ -1121,8 +1224,7 @@ public void alterPartitions(String dbName, String tblName, List> pa for (int i = 0; i < partValsList.size(); i++) { List partVals = partValsList.get(i); Partition newPart = newParts.get(i); - sharedCache.alterPartitionInColStatsCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals, newPart); + sharedCache.alterPartitionInColStatsCache(dbName, tblName, partVals, newPart); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1185,27 +1287,26 @@ private boolean getPartitionNamesPrunedByExprNoTxn(Table table, byte[] expr, public List getPartitionsByFilter(String dbName, String tblName, String filter, short maxParts) throws MetaException, NoSuchObjectException { - // TODO Auto-generated method stub - return null; + return rawStore.getPartitionsByFilter(dbName, tblName, filter, maxParts); } @Override public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, - String defaultPartitionName, short maxParts, List result) - throws TException { + String defaultPartitionName, short maxParts, List result) throws TException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionsByExpr( - dbName, tblName, expr, defaultPartitionName, maxParts, result); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getPartitionsByExpr(dbName, tblName, expr, defaultPartitionName, maxParts, + result); } List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartitionName, maxParts, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + boolean hasUnknownPartitions = getPartitionNamesPrunedByExprNoTxn(table, expr, + defaultPartitionName, maxParts, partNames, sharedCache); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); part.unsetPrivileges(); result.add(part); } @@ -1213,31 +1314,26 @@ public boolean getPartitionsByExpr(String dbName, String tblName, byte[] expr, } @Override - public int getNumPartitionsByFilter(String dbName, String tblName, - String filter) throws MetaException, NoSuchObjectException { - // TODO filter -> expr - // SharedCache sharedCache = sharedCacheWrapper.get(); - // if (sharedCache == null) { - return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); - // } - // Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - // StringUtils.normalizeIdentifier(tblName)); - // return 0; + public int getNumPartitionsByFilter(String dbName, String tblName, String filter) + throws MetaException, NoSuchObjectException { + return rawStore.getNumPartitionsByFilter(dbName, tblName, filter); } @Override public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getNumPartitionsByExpr(dbName, tblName, expr); } String defaultPartName = MetastoreConf.getVar(getConf(), ConfVars.DEFAULTPARTITIONNAME); List partNames = new LinkedList<>(); - Table table = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - getPartitionNamesPrunedByExprNoTxn( - table, expr, defaultPartName, Short.MAX_VALUE, partNames, sharedCache); + Table table = sharedCache.getTableFromCache(dbName, tblName); + getPartitionNamesPrunedByExprNoTxn(table, expr, defaultPartName, Short.MAX_VALUE, partNames, + sharedCache); return partNames.size(); } @@ -1254,14 +1350,16 @@ public int getNumPartitionsByExpr(String dbName, String tblName, byte[] expr) @Override public List getPartitionsByNames(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionsByNames(dbName, tblName, partNames); } List partitions = new ArrayList<>(); for (String partName : partNames) { - Partition part = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partNameToVals(partName)); + Partition part = sharedCache.getPartitionFromCache(dbName, tblName, partNameToVals(partName)); if (part!=null) { partitions.add(part); } @@ -1429,15 +1527,16 @@ public Role getRole(String roleName) throws NoSuchObjectException { public Partition getPartitionWithAuth(String dbName, String tblName, List partVals, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionWithAuth(dbName, tblName, partVals, userName, groupNames); } - Partition p = sharedCache.getPartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), partVals); + Partition p = sharedCache.getPartitionFromCache(dbName, tblName, partVals); if (p!=null) { - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); String partName = Warehouse.makePartName(t.getPartitionKeys(), partVals); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, userName, groupNames); @@ -1450,16 +1549,17 @@ public Partition getPartitionWithAuth(String dbName, String tblName, public List getPartitionsWithAuth(String dbName, String tblName, short maxParts, String userName, List groupNames) throws MetaException, NoSuchObjectException, InvalidObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.getPartitionsWithAuth(dbName, tblName, maxParts, userName, groupNames); } - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); List partitions = new ArrayList<>(); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { if (maxParts == -1 || count < maxParts) { String partName = Warehouse.makePartName(t.getPartitionKeys(), part.getValues()); PrincipalPrivilegeSet privs = getPartitionPrivilegeSet(dbName, tblName, partName, @@ -1476,16 +1576,17 @@ public Partition getPartitionWithAuth(String dbName, String tblName, public List listPartitionNamesPs(String dbName, String tblName, List partVals, short maxParts) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.listPartitionNamesPs(dbName, tblName, partVals, maxParts); } List partNames = new ArrayList<>(); int count = 0; - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + Table t = sharedCache.getTableFromCache(dbName, tblName); + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; for (int i=0;i listPartitionsPsWithAuth(String dbName, - String tblName, List partVals, short maxParts, String userName, - List groupNames) + public List listPartitionsPsWithAuth(String dbName, String tblName, + List partVals, short maxParts, String userName, List groupNames) throws MetaException, InvalidObjectException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.listPartitionsPsWithAuth( - dbName, tblName, partVals, maxParts, userName, groupNames); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.listPartitionsPsWithAuth(dbName, tblName, partVals, maxParts, userName, + groupNames); } List partitions = new ArrayList<>(); - Table t = sharedCache.getTableFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName)); + Table t = sharedCache.getTableFromCache(dbName, tblName); int count = 0; - for (Partition part : sharedCache.listCachedPartitions(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), maxParts)) { + for (Partition part : sharedCache.listCachedPartitions(dbName, tblName, maxParts)) { boolean psMatch = true; - for (int i=0;i statsObjs = colStats.getStatsObj(); - Table tbl = getTable(dbName, tableName); + Table tbl = getTable(dbName, tblName); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(tbl.getParameters(), colNames); - // Update table try { // Wait if background cache update is happening tableCacheLock.readLock().lock(); isTableCacheDirty.set(true); - sharedCache.alterTableInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), tbl); + sharedCache.alterTableInCache(dbName, tblName, tbl); } finally { tableCacheLock.readLock().unlock(); } - // Update table col stats try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.updateTableColStatsInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), statsObjs); + sharedCache.updateTableColStatsInCache(dbName, tblName, statsObjs); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1588,18 +1688,19 @@ public boolean updateTableColumnStatistics(ColumnStatistics colStats) } @Override - public ColumnStatistics getTableColumnStatistics(String dbName, String tableName, + public ColumnStatistics getTableColumnStatistics(String dbName, String tblName, List colNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableColumnStatistics(dbName, tableName, colNames); + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { + return rawStore.getTableColumnStatistics(dbName, tblName, colNames); } - ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tableName); + ColumnStatisticsDesc csd = new ColumnStatisticsDesc(true, dbName, tblName); List colStatObjs = new ArrayList<>(); for (String colName : colNames) { - String colStatsCacheKey = - CacheUtils.buildKey(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + String colStatsCacheKey = CacheUtils.buildKey(dbName, tblName, colName); ColumnStatisticsObj colStat = sharedCache.getCachedTableColStats(colStatsCacheKey); if (colStat != null) { colStatObjs.add(colStat); @@ -1613,18 +1714,22 @@ public ColumnStatistics getTableColumnStatistics(String dbName, String tableName } @Override - public boolean deleteTableColumnStatistics(String dbName, String tableName, String colName) + public boolean deleteTableColumnStatistics(String dbName, String tblName, String colName) throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { - boolean succ = rawStore.deleteTableColumnStatistics(dbName, tableName, colName); + boolean succ = rawStore.deleteTableColumnStatistics(dbName, tblName, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening tableColStatsCacheLock.readLock().lock(); isTableColStatsCacheDirty.set(true); - sharedCache.removeTableColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), colName); + sharedCache.removeTableColStatsFromCache(dbName, tblName, colName); } finally { tableColStatsCacheLock.readLock().unlock(); } @@ -1637,37 +1742,35 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List statsObjs = colStats.getStatsObj(); - Partition part = getPartition(dbName, tableName, partVals); + Partition part = getPartition(dbName, tblName, partVals); List colNames = new ArrayList<>(); for (ColumnStatisticsObj statsObj : statsObjs) { colNames.add(statsObj.getColName()); } StatsSetupConst.setColumnStatsState(part.getParameters(), colNames); - // Update partition try { // Wait if background cache update is happening partitionCacheLock.readLock().lock(); isPartitionCacheDirty.set(true); - sharedCache.alterPartitionInCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, part); + sharedCache.alterPartitionInCache(dbName, tblName, partVals, part); } finally { partitionCacheLock.readLock().unlock(); } - // Update partition column stats try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.updatePartitionColStatsInCache( - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getDbName()), - StringUtils.normalizeIdentifier(colStats.getStatsDesc().getTableName()), partVals, + sharedCache.updatePartitionColStatsInCache(dbName, tblName, partVals, colStats.getStatsObj()); } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -1678,27 +1781,30 @@ public boolean updatePartitionColumnStatistics(ColumnStatistics colStats, List getPartitionColumnStatistics(String dbName, String tblName, List partNames, List colNames) throws MetaException, NoSuchObjectException { return rawStore.getPartitionColumnStatistics(dbName, tblName, partNames, colNames); } @Override - public boolean deletePartitionColumnStatistics(String dbName, String tableName, String partName, - List partVals, String colName) throws NoSuchObjectException, MetaException, - InvalidObjectException, InvalidInputException { + public boolean deletePartitionColumnStatistics(String dbName, String tblName, String partName, + List partVals, String colName) + throws NoSuchObjectException, MetaException, InvalidObjectException, InvalidInputException { boolean succ = - rawStore.deletePartitionColumnStatistics(dbName, tableName, partName, partVals, colName); + rawStore.deletePartitionColumnStatistics(dbName, tblName, partName, partVals, colName); if (succ) { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return true; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return succ; + } try { // Wait if background cache update is happening partitionColStatsCacheLock.readLock().lock(); isPartitionColStatsCacheDirty.set(true); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tableName), partVals, colName); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, partVals, colName); } finally { partitionColStatsCacheLock.readLock().unlock(); } @@ -1708,14 +1814,17 @@ public boolean deletePartitionColumnStatistics(String dbName, String tableName, @Override public AggrStats get_aggr_stats_for(String dbName, String tblName, List partNames, - List colNames) throws MetaException, NoSuchObjectException { + List colNames) throws MetaException, NoSuchObjectException { + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { + if ((sharedCache == null) || !sharedCacheWrapper.isInitialized() + || !isCachedTable(dbName, tblName)) { return rawStore.get_aggr_stats_for(dbName, tblName, partNames, colNames); } - List colStats = mergeColStatsForPartitions( - StringUtils.normalizeIdentifier(dbName), StringUtils.normalizeIdentifier(tblName), - partNames, colNames, sharedCache); + List colStats = + mergeColStatsForPartitions(StringUtils.normalizeIdentifier(dbName), + StringUtils.normalizeIdentifier(tblName), partNames, colNames, sharedCache); return new AggrStats(colStats, partNames.size()); } @@ -1821,8 +1930,13 @@ public void setMetaStoreSchemaVersion(String version, String comment) public void dropPartitions(String dbName, String tblName, List partNames) throws MetaException, NoSuchObjectException { rawStore.dropPartitions(dbName, tblName, partNames); + dbName = StringUtils.normalizeIdentifier(dbName); + tblName = StringUtils.normalizeIdentifier(tblName); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return; + if ((sharedCache == null) || !sharedCacheWrapper.waitForInit(10000) + || !isCachedTable(dbName, tblName)) { + return; + } // Remove partitions try { // Wait if background cache update is happening @@ -1830,8 +1944,7 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionCacheDirty.set(true); for (String partName : partNames) { List vals = partNameToVals(partName); - sharedCache.removePartitionFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), vals); + sharedCache.removePartitionFromCache(dbName, tblName, vals); } } finally { partitionCacheLock.readLock().unlock(); @@ -1843,8 +1956,7 @@ public void dropPartitions(String dbName, String tblName, List partNames isPartitionColStatsCacheDirty.set(true); for (String partName : partNames) { List part_vals = partNameToVals(partName); - sharedCache.removePartitionColStatsFromCache(StringUtils.normalizeIdentifier(dbName), - StringUtils.normalizeIdentifier(tblName), part_vals); + sharedCache.removePartitionColStatsFromCache(dbName, tblName, part_vals); } } finally { partitionColStatsCacheLock.readLock().unlock(); @@ -2017,29 +2129,17 @@ public FileMetadataHandler getFileMetadataHandler(FileMetadataExprType type) { @Override public int getTableCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getTableCount(); - } - return sharedCache.getCachedTableCount(); + return rawStore.getTableCount(); } @Override public int getPartitionCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getPartitionCount(); - } - return sharedCache.getCachedPartitionCount(); + return rawStore.getPartitionCount(); } @Override public int getDatabaseCount() throws MetaException { - SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) { - return rawStore.getDatabaseCount(); - } - return sharedCache.getCachedDatabaseCount(); + return rawStore.getDatabaseCount(); } @Override @@ -2072,16 +2172,19 @@ public int getDatabaseCount() throws MetaException { } @Override - public List createTableWithConstraints(Table tbl, - List primaryKeys, List foreignKeys, - List uniqueConstraints, - List notNullConstraints) - throws InvalidObjectException, MetaException { + public List createTableWithConstraints(Table tbl, List primaryKeys, + List foreignKeys, List uniqueConstraints, + List notNullConstraints) throws InvalidObjectException, MetaException { // TODO constraintCache - List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, foreignKeys, - uniqueConstraints, notNullConstraints); + List constraintNames = rawStore.createTableWithConstraints(tbl, primaryKeys, + foreignKeys, uniqueConstraints, notNullConstraints); + String dbName = StringUtils.normalizeIdentifier(tbl.getDbName()); + String tblName = StringUtils.normalizeIdentifier(tbl.getTableName()); SharedCache sharedCache = sharedCacheWrapper.get(); - if (sharedCache == null) return constraintNames; + if ((sharedCache == null) || !shouldCacheTable(dbName, tblName) + || !sharedCacheWrapper.waitForInit(10000)) { + return constraintNames; + } sharedCache.addTableToCache(StringUtils.normalizeIdentifier(tbl.getDbName()), StringUtils.normalizeIdentifier(tbl.getTableName()), tbl); return constraintNames; @@ -2152,7 +2255,7 @@ public String getMetastoreDbUuid() throws MetaException { private final SharedCache instance = new SharedCache(); private final Object initLock = new Object(); - private InitState initState = InitState.NOT_ENABLED; + private volatile InitState initState = InitState.NOT_ENABLED; // We preserve the old setConf init behavior, where a failed prewarm would fail the query // and give a chance to another query to try prewarming again. Basically, we'd increment the // count and all the queries waiting for prewarm would fail; however, we will retry the prewarm @@ -2171,7 +2274,7 @@ void updateInitState(Throwable error, boolean isFatal) { if (isSuccessful) { initState = InitState.INITIALIZED; } else if (isFatal) { - initState = InitState.FAILED_FATAL; + initState = InitState.FAILED_FATAL; lastError = error; } else { ++initFailureCount; @@ -2196,7 +2299,7 @@ void startInit(Configuration conf) { * Fails on any initialization error, even if the init will be retried later. */ public SharedCache get() throws MetaException { - if (!waitForInit()) return null; + if (!waitForInit(100)) return null; return instance; } @@ -2205,7 +2308,7 @@ SharedCache getUnsafe() { return instance; } - private boolean waitForInit() throws MetaException { + private boolean waitForInit(long timeout) throws MetaException { synchronized (initLock) { int localFailureCount = initFailureCount; while (true) { @@ -2217,7 +2320,7 @@ private boolean waitForInit() throws MetaException { } case INITIALIZING: { try { - initLock.wait(100); + initLock.wait(timeout); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new MetaException("Interrupted"); @@ -2234,6 +2337,20 @@ private boolean waitForInit() throws MetaException { } } } + + /** + * Notify all threads blocked on initialization, to continue work. (We allow read while prewarm + * is running; all write calls are blocked using waitForInitAndBlock). + */ + void notifyAllBlocked() { + synchronized (initLock) { + initLock.notifyAll(); + } + } + + boolean isInitialized() { + return initState.equals(InitState.INITIALIZED); + } } @VisibleForTesting @@ -2305,4 +2422,70 @@ public void dropWMTrigger(String resourcePlanName, String triggerName) throws NoSuchObjectException, MetaException { return rawStore.getTriggersForResourcePlan(resourcePlanName); } + + static boolean isInBlacklist(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : blacklistPatterns) { + LOG.debug("Trying to match: {} against blacklist pattern: {}", str, pattern); + Matcher matcher = pattern.matcher(str); + if (matcher.matches()) { + LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), + matcher.start(), matcher.end()); + return false; + } + } + return true; + } + + static boolean isInWhitelist(String dbName, String tblName) { + String str = dbName + "." + tblName; + for (Pattern pattern : whitelistPatterns) { + LOG.debug("Trying to match: {} against whitelist pattern: {}", str, pattern); + Matcher matcher = pattern.matcher(str); + if (matcher.matches()) { + LOG.debug("Found matcher group: {} at start index: {} and end index: {}", matcher.group(), + matcher.start(), matcher.end()); + return true; + } + } + return false; + } + + // For testing + static void setWhitelistPattern(List patterns) { + whitelistPatterns = patterns; + } + + // For testing + static void setBlacklistPattern(List patterns) { + blacklistPatterns = patterns; + } + + // Determines if we should cache a table (& its partitions, stats etc), + // based on whitelist/blacklist + static boolean shouldCacheTable(String dbName, String tblName) { + boolean blacklistCheck = isInBlacklist(dbName, tblName); + LOG.debug("Blacklist check: {}", blacklistCheck); + boolean whitelistCheck = isInWhitelist(dbName, tblName); + LOG.debug("Whitelist check: {}", whitelistCheck); + return blacklistCheck && whitelistCheck; + } + + // Checks if the table exists in cache. + // If it does, we have also cached its partitions, stats etc + // Faster than shouldCacheTable, so we use this in most read queries, + // to decide if we can serve metadata from cache + private boolean isCachedTable(String dbName, String tblName) throws MetaException { + SharedCache sharedCache = sharedCacheWrapper.get(); + return sharedCache.getTableFromCache(dbName, tblName).equals(null) ? false : true; + } + + static List createPatterns(String configStr) { + List patternStrs = Arrays.asList(configStr.split(",")); + List patterns = new ArrayList(); + for (String str : patternStrs) { + patterns.add(Pattern.compile(str)); + } + return patterns; + } } diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java index a76b8480b0..6f871b32e0 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/SharedCache.java @@ -318,6 +318,28 @@ public synchronized Partition removePartitionFromCache(String dbName, String tbl return wrapper.getPartition(); } + /** + * Given a db + table, remove all partitions for this table from the cache + * @param dbName + * @param tblName + * @return + */ + public synchronized void removePartitionsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = partitionCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + PartitionWrapper wrapper = entry.getValue(); + if (wrapper.getSdHash() != null) { + decrSd(wrapper.getSdHash()); + } + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + // Remove cached column stats for all partitions of a table public synchronized void removePartitionColStatsFromCache(String dbName, String tblName) { String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); @@ -353,6 +375,25 @@ public synchronized void removePartitionColStatsFromCache(String dbName, String partitionColStatsCache.remove(CacheUtils.buildKey(dbName, tblName, partVals, colName)); } + /** + * Given a db + table, remove all partition col stats for this table from the cache + * + * @param dbName + * @param tblName + */ + public synchronized void removePartitionsColStatsFromCache(String dbName, String tblName) { + String partialKey = CacheUtils.buildKeyWithDelimit(dbName, tblName); + Iterator> iterator = + partitionColStatsCache.entrySet().iterator(); + while (iterator.hasNext()) { + Entry entry = iterator.next(); + String key = entry.getKey(); + if (key.toLowerCase().startsWith(partialKey.toLowerCase())) { + iterator.remove(); + } + } + } + public synchronized List listCachedPartitions(String dbName, String tblName, int max) { List partitions = new ArrayList<>(); int count = 0; diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java index 8e35d4434f..8194afd019 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java @@ -273,6 +273,17 @@ public static ConfVars getMetaConf(String name) { CACHED_RAW_STORE_CACHE_UPDATE_FREQUENCY("metastore.cached.rawstore.cache.update.frequency", "hive.metastore.cached.rawstore.cache.update.frequency", 60, TimeUnit.SECONDS, "The time after which metastore cache is updated from metastore DB."), + CACHED_RAW_STORE_CACHED_OBJECTS_WHITELIST("metastore.cached.rawstore.cached.object.whitelist", + "hive.metastore.cached.rawstore.cached.object.whitelist", "*", "Comma separated list of regular expressions \n " + + "to select the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.blacklist. \n" + + "Example: .*, db1.*, db2\\.tbl.*. The last item can potentially override patterns specified before."), + CACHED_RAW_STORE_CACHED_OBJECTS_BLACKLIST("metastore.cached.rawstore.cached.object.blacklist", + "hive.metastore.cached.rawstore.cached.object.blacklist", "", "Comma separated list of regular expressions \n " + + "to filter out the tables (and its partitions, stats etc) that will be cached by CachedStore. \n" + + "This can be used in conjunction with hive.metastore.cached.rawstore.cached.object.whitelist. \n" + + "Example: db2.*, db3\\.tbl1, db3\\..*. The last item can potentially override patterns specified before. \n" + + "The blacklist also overrides the whitelist."), CAPABILITY_CHECK("metastore.client.capability.check", "hive.metastore.client.capability.check", true, "Whether to check client capabilities for potentially breaking API usage."),